datafusion_sql/relation/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
21
22use datafusion_common::tree_node::{Transformed, TreeNode};
23use datafusion_common::{
24    not_impl_err, plan_err, DFSchema, Diagnostic, Result, Span, Spans, TableReference,
25};
26use datafusion_expr::builder::subquery_alias;
27use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder};
28use datafusion_expr::{Subquery, SubqueryAlias};
29use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor};
30
31mod join;
32
33impl<S: ContextProvider> SqlToRel<'_, S> {
34    /// Create a `LogicalPlan` that scans the named relation
35    fn create_relation(
36        &self,
37        relation: TableFactor,
38        planner_context: &mut PlannerContext,
39    ) -> Result<LogicalPlan> {
40        let relation_span = relation.span();
41        let (plan, alias) = match relation {
42            TableFactor::Table {
43                name, alias, args, ..
44            } => {
45                if let Some(func_args) = args {
46                    let tbl_func_name =
47                        name.0.first().unwrap().as_ident().unwrap().to_string();
48                    let args = func_args
49                        .args
50                        .into_iter()
51                        .flat_map(|arg| {
52                            if let FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) = arg
53                            {
54                                self.sql_expr_to_logical_expr(
55                                    expr,
56                                    &DFSchema::empty(),
57                                    planner_context,
58                                )
59                            } else {
60                                plan_err!("Unsupported function argument type: {}", arg)
61                            }
62                        })
63                        .collect::<Vec<_>>();
64                    let provider = self
65                        .context_provider
66                        .get_table_function_source(&tbl_func_name, args)?;
67                    let plan = LogicalPlanBuilder::scan(
68                        TableReference::Bare {
69                            table: format!("{tbl_func_name}()").into(),
70                        },
71                        provider,
72                        None,
73                    )?
74                    .build()?;
75                    (plan, alias)
76                } else {
77                    // Normalize name and alias
78                    let table_ref = self.object_name_to_table_reference(name)?;
79                    let table_name = table_ref.to_string();
80                    let cte = planner_context.get_cte(&table_name);
81                    (
82                        match (
83                            cte,
84                            self.context_provider.get_table_source(table_ref.clone()),
85                        ) {
86                            (Some(cte_plan), _) => Ok(cte_plan.clone()),
87                            (_, Ok(provider)) => LogicalPlanBuilder::scan(
88                                table_ref.clone(),
89                                provider,
90                                None,
91                            )?
92                            .build(),
93                            (None, Err(e)) => {
94                                let e = e.with_diagnostic(Diagnostic::new_error(
95                                    format!("table '{table_ref}' not found"),
96                                    Span::try_from_sqlparser_span(relation_span),
97                                ));
98                                Err(e)
99                            }
100                        }?,
101                        alias,
102                    )
103                }
104            }
105            TableFactor::Derived {
106                subquery, alias, ..
107            } => {
108                let logical_plan = self.query_to_plan(*subquery, planner_context)?;
109                (logical_plan, alias)
110            }
111            TableFactor::NestedJoin {
112                table_with_joins,
113                alias,
114            } => (
115                self.plan_table_with_joins(*table_with_joins, planner_context)?,
116                alias,
117            ),
118            TableFactor::UNNEST {
119                alias,
120                array_exprs,
121                with_offset: false,
122                with_offset_alias: None,
123                with_ordinality,
124            } => {
125                if with_ordinality {
126                    return not_impl_err!("UNNEST with ordinality is not supported yet");
127                }
128
129                // Unnest table factor has empty input
130                let schema = DFSchema::empty();
131                let input = LogicalPlanBuilder::empty(true).build()?;
132                // Unnest table factor can have multiple arguments.
133                // We treat each argument as a separate unnest expression.
134                let unnest_exprs = array_exprs
135                    .into_iter()
136                    .map(|sql_expr| {
137                        let expr = self.sql_expr_to_logical_expr(
138                            sql_expr,
139                            &schema,
140                            planner_context,
141                        )?;
142                        Self::check_unnest_arg(&expr, &schema)?;
143                        Ok(Expr::Unnest(Unnest::new(expr)))
144                    })
145                    .collect::<Result<Vec<_>>>()?;
146                if unnest_exprs.is_empty() {
147                    return plan_err!("UNNEST must have at least one argument");
148                }
149                let logical_plan = self.try_process_unnest(input, unnest_exprs)?;
150                (logical_plan, alias)
151            }
152            TableFactor::UNNEST { .. } => {
153                return not_impl_err!(
154                    "UNNEST table factor with offset is not supported yet"
155                );
156            }
157            TableFactor::Function {
158                name, args, alias, ..
159            } => {
160                let tbl_func_ref = self.object_name_to_table_reference(name)?;
161                let schema = planner_context
162                    .outer_query_schema()
163                    .cloned()
164                    .unwrap_or_else(DFSchema::empty);
165                let func_args = args
166                    .into_iter()
167                    .map(|arg| match arg {
168                        FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))
169                        | FunctionArg::Named {
170                            arg: FunctionArgExpr::Expr(expr),
171                            ..
172                        } => {
173                            self.sql_expr_to_logical_expr(expr, &schema, planner_context)
174                        }
175                        _ => plan_err!("Unsupported function argument: {arg:?}"),
176                    })
177                    .collect::<Result<Vec<Expr>>>()?;
178                let provider = self
179                    .context_provider
180                    .get_table_function_source(tbl_func_ref.table(), func_args)?;
181                let plan =
182                    LogicalPlanBuilder::scan(tbl_func_ref.table(), provider, None)?
183                        .build()?;
184                (plan, alias)
185            }
186            // @todo Support TableFactory::TableFunction?
187            _ => {
188                return not_impl_err!(
189                    "Unsupported ast node {relation:?} in create_relation"
190                );
191            }
192        };
193
194        let optimized_plan = optimize_subquery_sort(plan)?.data;
195        if let Some(alias) = alias {
196            self.apply_table_alias(optimized_plan, alias)
197        } else {
198            Ok(optimized_plan)
199        }
200    }
201
202    pub(crate) fn create_relation_subquery(
203        &self,
204        subquery: TableFactor,
205        planner_context: &mut PlannerContext,
206    ) -> Result<LogicalPlan> {
207        // At this point for a syntactically valid query the outer_from_schema is
208        // guaranteed to be set, so the `.unwrap()` call will never panic. This
209        // is the case because we only call this method for lateral table
210        // factors, and those can never be the first factor in a FROM list. This
211        // means we arrived here through the `for` loop in `plan_from_tables` or
212        // the `for` loop in `plan_table_with_joins`.
213        let old_from_schema = planner_context
214            .set_outer_from_schema(None)
215            .unwrap_or_else(|| Arc::new(DFSchema::empty()));
216        let new_query_schema = match planner_context.outer_query_schema() {
217            Some(old_query_schema) => {
218                let mut new_query_schema = old_from_schema.as_ref().clone();
219                new_query_schema.merge(old_query_schema);
220                Some(Arc::new(new_query_schema))
221            }
222            None => Some(Arc::clone(&old_from_schema)),
223        };
224        let old_query_schema = planner_context.set_outer_query_schema(new_query_schema);
225
226        let plan = self.create_relation(subquery, planner_context)?;
227        let outer_ref_columns = plan.all_out_ref_exprs();
228
229        planner_context.set_outer_query_schema(old_query_schema);
230        planner_context.set_outer_from_schema(Some(old_from_schema));
231
232        // We can omit the subquery wrapper if there are no columns
233        // referencing the outer scope.
234        if outer_ref_columns.is_empty() {
235            return Ok(plan);
236        }
237
238        match plan {
239            LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
240                subquery_alias(
241                    LogicalPlan::Subquery(Subquery {
242                        subquery: input,
243                        outer_ref_columns,
244                        spans: Spans::new(),
245                    }),
246                    alias,
247                )
248            }
249            plan => Ok(LogicalPlan::Subquery(Subquery {
250                subquery: Arc::new(plan),
251                outer_ref_columns,
252                spans: Spans::new(),
253            })),
254        }
255    }
256}
257
258fn optimize_subquery_sort(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
259    // When initializing subqueries, we examine sort options since they might be unnecessary.
260    // They are only important if the subquery result is affected by the ORDER BY statement,
261    // which can happen when we have:
262    // 1. DISTINCT ON / ARRAY_AGG ... => Handled by an `Aggregate` and its requirements.
263    // 2. RANK / ROW_NUMBER ... => Handled by a `WindowAggr` and its requirements.
264    // 3. LIMIT => Handled by a `Sort`, so we need to search for it.
265    let mut has_limit = false;
266    let new_plan = plan.transform_down(|c| {
267        if let LogicalPlan::Limit(_) = c {
268            has_limit = true;
269            return Ok(Transformed::no(c));
270        }
271        match c {
272            LogicalPlan::Sort(s) => {
273                if !has_limit {
274                    has_limit = false;
275                    return Ok(Transformed::yes(s.input.as_ref().clone()));
276                }
277                Ok(Transformed::no(LogicalPlan::Sort(s)))
278            }
279            _ => Ok(Transformed::no(c)),
280        }
281    });
282    new_plan
283}