1use std::sync::Arc;
19
20use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
21
22use crate::stack::StackGuard;
23use datafusion_common::{not_impl_err, Constraints, DFSchema, Result};
24use datafusion_expr::expr::{Sort, WildcardOptions};
25
26use datafusion_expr::select_expr::SelectExpr;
27use datafusion_expr::{
28 CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder,
29};
30use sqlparser::ast::{
31 Expr as SQLExpr, ExprWithAliasAndOrderBy, Ident, LimitClause, Offset, OffsetRows,
32 OrderBy, OrderByExpr, OrderByKind, PipeOperator, Query, SelectInto, SetExpr,
33 SetOperator, SetQuantifier, TableAlias,
34};
35use sqlparser::tokenizer::Span;
36
37impl<S: ContextProvider> SqlToRel<'_, S> {
38 pub(crate) fn query_to_plan(
40 &self,
41 query: Query,
42 outer_planner_context: &mut PlannerContext,
43 ) -> Result<LogicalPlan> {
44 let mut query_plan_context = outer_planner_context.clone();
47 let planner_context = &mut query_plan_context;
48
49 if let Some(with) = query.with {
50 self.plan_with_clause(with, planner_context)?;
51 }
52
53 let set_expr = *query.body;
54 let plan = match set_expr {
55 SetExpr::Select(mut select) => {
56 let select_into = select.into.take();
57 let plan =
58 self.select_to_plan(*select, query.order_by, planner_context)?;
59 let plan = self.limit(plan, query.limit_clause, planner_context)?;
60 self.select_into(plan, select_into)
62 }
63 other => {
64 let plan = {
68 let _guard = StackGuard::new(256 * 1024);
70 self.set_expr_to_plan(other, planner_context)
71 }?;
72 let oby_exprs = to_order_by_exprs(query.order_by)?;
73 let order_by_rex = self.order_by_to_sort_expr(
74 oby_exprs,
75 plan.schema(),
76 planner_context,
77 true,
78 None,
79 )?;
80 let plan = self.order_by(plan, order_by_rex)?;
81 self.limit(plan, query.limit_clause, planner_context)
82 }
83 }?;
84
85 self.pipe_operators(plan, query.pipe_operators, planner_context)
86 }
87
88 fn pipe_operators(
90 &self,
91 mut plan: LogicalPlan,
92 pipe_operators: Vec<PipeOperator>,
93 planner_context: &mut PlannerContext,
94 ) -> Result<LogicalPlan> {
95 for pipe_operator in pipe_operators {
96 plan = self.pipe_operator(plan, pipe_operator, planner_context)?;
97 }
98 Ok(plan)
99 }
100
101 fn pipe_operator(
103 &self,
104 plan: LogicalPlan,
105 pipe_operator: PipeOperator,
106 planner_context: &mut PlannerContext,
107 ) -> Result<LogicalPlan> {
108 match pipe_operator {
109 PipeOperator::Where { expr } => {
110 self.plan_selection(Some(expr), plan, planner_context)
111 }
112 PipeOperator::OrderBy { exprs } => {
113 let sort_exprs = self.order_by_to_sort_expr(
114 exprs,
115 plan.schema(),
116 planner_context,
117 true,
118 None,
119 )?;
120 self.order_by(plan, sort_exprs)
121 }
122 PipeOperator::Limit { expr, offset } => self.limit(
123 plan,
124 Some(LimitClause::LimitOffset {
125 limit: Some(expr),
126 offset: offset.map(|offset| Offset {
127 value: offset,
128 rows: OffsetRows::None,
129 }),
130 limit_by: vec![],
131 }),
132 planner_context,
133 ),
134 PipeOperator::Select { exprs } => {
135 let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
136 let select_exprs =
137 self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?;
138 self.project(plan, select_exprs)
139 }
140 PipeOperator::Extend { exprs } => {
141 let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
142 let extend_exprs =
143 self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?;
144 let all_exprs =
145 std::iter::once(SelectExpr::Wildcard(WildcardOptions::default()))
146 .chain(extend_exprs)
147 .collect();
148 self.project(plan, all_exprs)
149 }
150 PipeOperator::As { alias } => self.apply_table_alias(
151 plan,
152 TableAlias {
153 name: alias,
154 columns: vec![],
156 },
157 ),
158 PipeOperator::Union {
159 set_quantifier,
160 queries,
161 } => self.pipe_operator_set(
162 plan,
163 SetOperator::Union,
164 set_quantifier,
165 queries,
166 planner_context,
167 ),
168 PipeOperator::Intersect {
169 set_quantifier,
170 queries,
171 } => self.pipe_operator_set(
172 plan,
173 SetOperator::Intersect,
174 set_quantifier,
175 queries,
176 planner_context,
177 ),
178 PipeOperator::Except {
179 set_quantifier,
180 queries,
181 } => self.pipe_operator_set(
182 plan,
183 SetOperator::Except,
184 set_quantifier,
185 queries,
186 planner_context,
187 ),
188 PipeOperator::Aggregate {
189 full_table_exprs,
190 group_by_expr,
191 } => self.pipe_operator_aggregate(
192 plan,
193 full_table_exprs,
194 group_by_expr,
195 planner_context,
196 ),
197 PipeOperator::Join(join) => {
198 self.parse_relation_join(plan, join, planner_context)
199 }
200
201 x => not_impl_err!("`{x}` pipe operator is not supported yet"),
202 }
203 }
204
205 fn pipe_operator_set(
207 &self,
208 mut plan: LogicalPlan,
209 set_operator: SetOperator,
210 set_quantifier: SetQuantifier,
211 queries: Vec<Query>,
212 planner_context: &mut PlannerContext,
213 ) -> Result<LogicalPlan> {
214 for query in queries {
215 let right_plan = self.query_to_plan(query, planner_context)?;
216 plan = self.set_operation_to_plan(
217 set_operator,
218 plan,
219 right_plan,
220 set_quantifier,
221 )?;
222 }
223
224 Ok(plan)
225 }
226
227 fn limit(
229 &self,
230 input: LogicalPlan,
231 limit_clause: Option<LimitClause>,
232 planner_context: &mut PlannerContext,
233 ) -> Result<LogicalPlan> {
234 let Some(limit_clause) = limit_clause else {
235 return Ok(input);
236 };
237
238 let empty_schema = DFSchema::empty();
239
240 let (skip, fetch, limit_by_exprs) = match limit_clause {
241 LimitClause::LimitOffset {
242 limit,
243 offset,
244 limit_by,
245 } => {
246 let skip = offset
247 .map(|o| self.sql_to_expr(o.value, &empty_schema, planner_context))
248 .transpose()?;
249
250 let fetch = limit
251 .map(|e| self.sql_to_expr(e, &empty_schema, planner_context))
252 .transpose()?;
253
254 let limit_by_exprs = limit_by
255 .into_iter()
256 .map(|e| self.sql_to_expr(e, &empty_schema, planner_context))
257 .collect::<Result<Vec<_>>>()?;
258
259 (skip, fetch, limit_by_exprs)
260 }
261 LimitClause::OffsetCommaLimit { offset, limit } => {
262 let skip =
263 Some(self.sql_to_expr(offset, &empty_schema, planner_context)?);
264 let fetch =
265 Some(self.sql_to_expr(limit, &empty_schema, planner_context)?);
266 (skip, fetch, vec![])
267 }
268 };
269
270 if !limit_by_exprs.is_empty() {
271 return not_impl_err!("LIMIT BY clause is not supported yet");
272 }
273
274 if skip.is_none() && fetch.is_none() {
275 return Ok(input);
276 }
277
278 LogicalPlanBuilder::from(input)
279 .limit_by_expr(skip, fetch)?
280 .build()
281 }
282
283 pub(super) fn order_by(
285 &self,
286 plan: LogicalPlan,
287 order_by: Vec<Sort>,
288 ) -> Result<LogicalPlan> {
289 if order_by.is_empty() {
290 return Ok(plan);
291 }
292
293 if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan {
294 let distinct_on = distinct_on.clone().with_sort_expr(order_by)?;
297 Ok(LogicalPlan::Distinct(Distinct::On(distinct_on)))
298 } else {
299 LogicalPlanBuilder::from(plan).sort(order_by)?.build()
300 }
301 }
302
303 fn pipe_operator_aggregate(
305 &self,
306 plan: LogicalPlan,
307 full_table_exprs: Vec<ExprWithAliasAndOrderBy>,
308 group_by_expr: Vec<ExprWithAliasAndOrderBy>,
309 planner_context: &mut PlannerContext,
310 ) -> Result<LogicalPlan> {
311 let plan_schema = plan.schema();
312 let process_expr =
313 |expr_with_alias_and_order_by: ExprWithAliasAndOrderBy,
314 planner_context: &mut PlannerContext| {
315 let expr_with_alias = expr_with_alias_and_order_by.expr;
316 let sql_expr = expr_with_alias.expr;
317 let alias = expr_with_alias.alias;
318
319 let df_expr = self.sql_to_expr(sql_expr, plan_schema, planner_context)?;
320
321 match alias {
322 Some(alias_ident) => df_expr.alias_if_changed(alias_ident.value),
323 None => Ok(df_expr),
324 }
325 };
326
327 let aggr_exprs: Vec<Expr> = full_table_exprs
328 .into_iter()
329 .map(|e| process_expr(e, planner_context))
330 .collect::<Result<Vec<_>>>()?;
331
332 let group_by_exprs: Vec<Expr> = group_by_expr
333 .into_iter()
334 .map(|e| process_expr(e, planner_context))
335 .collect::<Result<Vec<_>>>()?;
336
337 LogicalPlanBuilder::from(plan)
338 .aggregate(group_by_exprs, aggr_exprs)?
339 .build()
340 }
341
342 fn select_into(
344 &self,
345 plan: LogicalPlan,
346 select_into: Option<SelectInto>,
347 ) -> Result<LogicalPlan> {
348 match select_into {
349 Some(into) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
350 CreateMemoryTable {
351 name: self.object_name_to_table_reference(into.name)?,
352 constraints: Constraints::default(),
353 input: Arc::new(plan),
354 if_not_exists: false,
355 or_replace: false,
356 temporary: false,
357 column_defaults: vec![],
358 },
359 ))),
360 _ => Ok(plan),
361 }
362 }
363}
364
365fn to_order_by_exprs(order_by: Option<OrderBy>) -> Result<Vec<OrderByExpr>> {
367 to_order_by_exprs_with_select(order_by, None)
368}
369
370pub(crate) fn to_order_by_exprs_with_select(
372 order_by: Option<OrderBy>,
373 select_exprs: Option<&Vec<Expr>>,
374) -> Result<Vec<OrderByExpr>> {
375 let Some(OrderBy { kind, interpolate }) = order_by else {
376 return Ok(vec![]);
378 };
379 if let Some(_interpolate) = interpolate {
380 return not_impl_err!("ORDER BY INTERPOLATE is not supported");
381 }
382 match kind {
383 OrderByKind::All(order_by_options) => {
384 let Some(exprs) = select_exprs else {
385 return Ok(vec![]);
386 };
387 let order_by_exprs = exprs
388 .iter()
389 .map(|select_expr| match select_expr {
390 Expr::Column(column) => Ok(OrderByExpr {
391 expr: SQLExpr::Identifier(Ident {
392 value: column.name.clone(),
393 quote_style: None,
394 span: Span::empty(),
395 }),
396 options: order_by_options,
397 with_fill: None,
398 }),
399 _ => not_impl_err!(
401 "ORDER BY ALL is not supported for non-column expressions"
402 ),
403 })
404 .collect::<Result<Vec<_>>>()?;
405 Ok(order_by_exprs)
406 }
407 OrderByKind::Expressions(order_by_exprs) => Ok(order_by_exprs),
408 }
409}