datafusion_sql/
query.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
21
22use crate::stack::StackGuard;
23use datafusion_common::{not_impl_err, Constraints, DFSchema, Result};
24use datafusion_expr::expr::{Sort, WildcardOptions};
25
26use datafusion_expr::select_expr::SelectExpr;
27use datafusion_expr::{
28    CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder,
29};
30use sqlparser::ast::{
31    Expr as SQLExpr, ExprWithAliasAndOrderBy, Ident, LimitClause, Offset, OffsetRows,
32    OrderBy, OrderByExpr, OrderByKind, PipeOperator, Query, SelectInto, SetExpr,
33    SetOperator, SetQuantifier, TableAlias,
34};
35use sqlparser::tokenizer::Span;
36
37impl<S: ContextProvider> SqlToRel<'_, S> {
38    /// Generate a logical plan from an SQL query/subquery
39    pub(crate) fn query_to_plan(
40        &self,
41        query: Query,
42        outer_planner_context: &mut PlannerContext,
43    ) -> Result<LogicalPlan> {
44        // Each query has its own planner context, including CTEs that are visible within that query.
45        // It also inherits the CTEs from the outer query by cloning the outer planner context.
46        let mut query_plan_context = outer_planner_context.clone();
47        let planner_context = &mut query_plan_context;
48
49        if let Some(with) = query.with {
50            self.plan_with_clause(with, planner_context)?;
51        }
52
53        let set_expr = *query.body;
54        let plan = match set_expr {
55            SetExpr::Select(mut select) => {
56                let select_into = select.into.take();
57                let plan =
58                    self.select_to_plan(*select, query.order_by, planner_context)?;
59                let plan = self.limit(plan, query.limit_clause, planner_context)?;
60                // Process the `SELECT INTO` after `LIMIT`.
61                self.select_into(plan, select_into)
62            }
63            other => {
64                // The functions called from `set_expr_to_plan()` need more than 128KB
65                // stack in debug builds as investigated in:
66                // https://github.com/apache/datafusion/pull/13310#discussion_r1836813902
67                let plan = {
68                    // scope for dropping _guard
69                    let _guard = StackGuard::new(256 * 1024);
70                    self.set_expr_to_plan(other, planner_context)
71                }?;
72                let oby_exprs = to_order_by_exprs(query.order_by)?;
73                let order_by_rex = self.order_by_to_sort_expr(
74                    oby_exprs,
75                    plan.schema(),
76                    planner_context,
77                    true,
78                    None,
79                )?;
80                let plan = self.order_by(plan, order_by_rex)?;
81                self.limit(plan, query.limit_clause, planner_context)
82            }
83        }?;
84
85        self.pipe_operators(plan, query.pipe_operators, planner_context)
86    }
87
88    /// Apply pipe operators to a plan
89    fn pipe_operators(
90        &self,
91        mut plan: LogicalPlan,
92        pipe_operators: Vec<PipeOperator>,
93        planner_context: &mut PlannerContext,
94    ) -> Result<LogicalPlan> {
95        for pipe_operator in pipe_operators {
96            plan = self.pipe_operator(plan, pipe_operator, planner_context)?;
97        }
98        Ok(plan)
99    }
100
101    /// Apply a pipe operator to a plan
102    fn pipe_operator(
103        &self,
104        plan: LogicalPlan,
105        pipe_operator: PipeOperator,
106        planner_context: &mut PlannerContext,
107    ) -> Result<LogicalPlan> {
108        match pipe_operator {
109            PipeOperator::Where { expr } => {
110                self.plan_selection(Some(expr), plan, planner_context)
111            }
112            PipeOperator::OrderBy { exprs } => {
113                let sort_exprs = self.order_by_to_sort_expr(
114                    exprs,
115                    plan.schema(),
116                    planner_context,
117                    true,
118                    None,
119                )?;
120                self.order_by(plan, sort_exprs)
121            }
122            PipeOperator::Limit { expr, offset } => self.limit(
123                plan,
124                Some(LimitClause::LimitOffset {
125                    limit: Some(expr),
126                    offset: offset.map(|offset| Offset {
127                        value: offset,
128                        rows: OffsetRows::None,
129                    }),
130                    limit_by: vec![],
131                }),
132                planner_context,
133            ),
134            PipeOperator::Select { exprs } => {
135                let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
136                let select_exprs =
137                    self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?;
138                self.project(plan, select_exprs)
139            }
140            PipeOperator::Extend { exprs } => {
141                let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
142                let extend_exprs =
143                    self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?;
144                let all_exprs =
145                    std::iter::once(SelectExpr::Wildcard(WildcardOptions::default()))
146                        .chain(extend_exprs)
147                        .collect();
148                self.project(plan, all_exprs)
149            }
150            PipeOperator::As { alias } => self.apply_table_alias(
151                plan,
152                TableAlias {
153                    name: alias,
154                    // Apply to all fields
155                    columns: vec![],
156                },
157            ),
158            PipeOperator::Union {
159                set_quantifier,
160                queries,
161            } => self.pipe_operator_set(
162                plan,
163                SetOperator::Union,
164                set_quantifier,
165                queries,
166                planner_context,
167            ),
168            PipeOperator::Intersect {
169                set_quantifier,
170                queries,
171            } => self.pipe_operator_set(
172                plan,
173                SetOperator::Intersect,
174                set_quantifier,
175                queries,
176                planner_context,
177            ),
178            PipeOperator::Except {
179                set_quantifier,
180                queries,
181            } => self.pipe_operator_set(
182                plan,
183                SetOperator::Except,
184                set_quantifier,
185                queries,
186                planner_context,
187            ),
188            PipeOperator::Aggregate {
189                full_table_exprs,
190                group_by_expr,
191            } => self.pipe_operator_aggregate(
192                plan,
193                full_table_exprs,
194                group_by_expr,
195                planner_context,
196            ),
197            PipeOperator::Join(join) => {
198                self.parse_relation_join(plan, join, planner_context)
199            }
200
201            x => not_impl_err!("`{x}` pipe operator is not supported yet"),
202        }
203    }
204
205    /// Handle Union/Intersect/Except pipe operators
206    fn pipe_operator_set(
207        &self,
208        mut plan: LogicalPlan,
209        set_operator: SetOperator,
210        set_quantifier: SetQuantifier,
211        queries: Vec<Query>,
212        planner_context: &mut PlannerContext,
213    ) -> Result<LogicalPlan> {
214        for query in queries {
215            let right_plan = self.query_to_plan(query, planner_context)?;
216            plan = self.set_operation_to_plan(
217                set_operator,
218                plan,
219                right_plan,
220                set_quantifier,
221            )?;
222        }
223
224        Ok(plan)
225    }
226
227    /// Wrap a plan in a limit
228    fn limit(
229        &self,
230        input: LogicalPlan,
231        limit_clause: Option<LimitClause>,
232        planner_context: &mut PlannerContext,
233    ) -> Result<LogicalPlan> {
234        let Some(limit_clause) = limit_clause else {
235            return Ok(input);
236        };
237
238        let empty_schema = DFSchema::empty();
239
240        let (skip, fetch, limit_by_exprs) = match limit_clause {
241            LimitClause::LimitOffset {
242                limit,
243                offset,
244                limit_by,
245            } => {
246                let skip = offset
247                    .map(|o| self.sql_to_expr(o.value, &empty_schema, planner_context))
248                    .transpose()?;
249
250                let fetch = limit
251                    .map(|e| self.sql_to_expr(e, &empty_schema, planner_context))
252                    .transpose()?;
253
254                let limit_by_exprs = limit_by
255                    .into_iter()
256                    .map(|e| self.sql_to_expr(e, &empty_schema, planner_context))
257                    .collect::<Result<Vec<_>>>()?;
258
259                (skip, fetch, limit_by_exprs)
260            }
261            LimitClause::OffsetCommaLimit { offset, limit } => {
262                let skip =
263                    Some(self.sql_to_expr(offset, &empty_schema, planner_context)?);
264                let fetch =
265                    Some(self.sql_to_expr(limit, &empty_schema, planner_context)?);
266                (skip, fetch, vec![])
267            }
268        };
269
270        if !limit_by_exprs.is_empty() {
271            return not_impl_err!("LIMIT BY clause is not supported yet");
272        }
273
274        if skip.is_none() && fetch.is_none() {
275            return Ok(input);
276        }
277
278        LogicalPlanBuilder::from(input)
279            .limit_by_expr(skip, fetch)?
280            .build()
281    }
282
283    /// Wrap the logical in a sort
284    pub(super) fn order_by(
285        &self,
286        plan: LogicalPlan,
287        order_by: Vec<Sort>,
288    ) -> Result<LogicalPlan> {
289        if order_by.is_empty() {
290            return Ok(plan);
291        }
292
293        if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan {
294            // In case of `DISTINCT ON` we must capture the sort expressions since during the plan
295            // optimization we're effectively doing a `first_value` aggregation according to them.
296            let distinct_on = distinct_on.clone().with_sort_expr(order_by)?;
297            Ok(LogicalPlan::Distinct(Distinct::On(distinct_on)))
298        } else {
299            LogicalPlanBuilder::from(plan).sort(order_by)?.build()
300        }
301    }
302
303    /// Handle AGGREGATE pipe operator
304    fn pipe_operator_aggregate(
305        &self,
306        plan: LogicalPlan,
307        full_table_exprs: Vec<ExprWithAliasAndOrderBy>,
308        group_by_expr: Vec<ExprWithAliasAndOrderBy>,
309        planner_context: &mut PlannerContext,
310    ) -> Result<LogicalPlan> {
311        let plan_schema = plan.schema();
312        let process_expr =
313            |expr_with_alias_and_order_by: ExprWithAliasAndOrderBy,
314             planner_context: &mut PlannerContext| {
315                let expr_with_alias = expr_with_alias_and_order_by.expr;
316                let sql_expr = expr_with_alias.expr;
317                let alias = expr_with_alias.alias;
318
319                let df_expr = self.sql_to_expr(sql_expr, plan_schema, planner_context)?;
320
321                match alias {
322                    Some(alias_ident) => df_expr.alias_if_changed(alias_ident.value),
323                    None => Ok(df_expr),
324                }
325            };
326
327        let aggr_exprs: Vec<Expr> = full_table_exprs
328            .into_iter()
329            .map(|e| process_expr(e, planner_context))
330            .collect::<Result<Vec<_>>>()?;
331
332        let group_by_exprs: Vec<Expr> = group_by_expr
333            .into_iter()
334            .map(|e| process_expr(e, planner_context))
335            .collect::<Result<Vec<_>>>()?;
336
337        LogicalPlanBuilder::from(plan)
338            .aggregate(group_by_exprs, aggr_exprs)?
339            .build()
340    }
341
342    /// Wrap the logical plan in a `SelectInto`
343    fn select_into(
344        &self,
345        plan: LogicalPlan,
346        select_into: Option<SelectInto>,
347    ) -> Result<LogicalPlan> {
348        match select_into {
349            Some(into) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
350                CreateMemoryTable {
351                    name: self.object_name_to_table_reference(into.name)?,
352                    constraints: Constraints::default(),
353                    input: Arc::new(plan),
354                    if_not_exists: false,
355                    or_replace: false,
356                    temporary: false,
357                    column_defaults: vec![],
358                },
359            ))),
360            _ => Ok(plan),
361        }
362    }
363}
364
365/// Returns the order by expressions from the query.
366fn to_order_by_exprs(order_by: Option<OrderBy>) -> Result<Vec<OrderByExpr>> {
367    to_order_by_exprs_with_select(order_by, None)
368}
369
370/// Returns the order by expressions from the query with the select expressions.
371pub(crate) fn to_order_by_exprs_with_select(
372    order_by: Option<OrderBy>,
373    select_exprs: Option<&Vec<Expr>>,
374) -> Result<Vec<OrderByExpr>> {
375    let Some(OrderBy { kind, interpolate }) = order_by else {
376        // If no order by, return an empty array.
377        return Ok(vec![]);
378    };
379    if let Some(_interpolate) = interpolate {
380        return not_impl_err!("ORDER BY INTERPOLATE is not supported");
381    }
382    match kind {
383        OrderByKind::All(order_by_options) => {
384            let Some(exprs) = select_exprs else {
385                return Ok(vec![]);
386            };
387            let order_by_exprs = exprs
388                .iter()
389                .map(|select_expr| match select_expr {
390                    Expr::Column(column) => Ok(OrderByExpr {
391                        expr: SQLExpr::Identifier(Ident {
392                            value: column.name.clone(),
393                            quote_style: None,
394                            span: Span::empty(),
395                        }),
396                        options: order_by_options,
397                        with_fill: None,
398                    }),
399                    // TODO: Support other types of expressions
400                    _ => not_impl_err!(
401                        "ORDER BY ALL is not supported for non-column expressions"
402                    ),
403                })
404                .collect::<Result<Vec<_>>>()?;
405            Ok(order_by_exprs)
406        }
407        OrderByKind::Expressions(order_by_exprs) => Ok(order_by_exprs),
408    }
409}