1use std::sync::Arc;
19
20use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
21
22use datafusion_common::tree_node::{Transformed, TreeNode};
23use datafusion_common::{
24 not_impl_err, plan_err, DFSchema, Diagnostic, Result, Span, Spans, TableReference,
25};
26use datafusion_expr::builder::subquery_alias;
27use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder};
28use datafusion_expr::{Subquery, SubqueryAlias};
29use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor};
30
31mod join;
32
33impl<S: ContextProvider> SqlToRel<'_, S> {
34 fn create_relation(
36 &self,
37 relation: TableFactor,
38 planner_context: &mut PlannerContext,
39 ) -> Result<LogicalPlan> {
40 let relation_span = relation.span();
41 let (plan, alias) = match relation {
42 TableFactor::Table {
43 name, alias, args, ..
44 } => {
45 if let Some(func_args) = args {
46 let tbl_func_name =
47 name.0.first().unwrap().as_ident().unwrap().to_string();
48 let args = func_args
49 .args
50 .into_iter()
51 .flat_map(|arg| {
52 if let FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) = arg
53 {
54 self.sql_expr_to_logical_expr(
55 expr,
56 &DFSchema::empty(),
57 planner_context,
58 )
59 } else {
60 plan_err!("Unsupported function argument type: {}", arg)
61 }
62 })
63 .collect::<Vec<_>>();
64 let provider = self
65 .context_provider
66 .get_table_function_source(&tbl_func_name, args)?;
67 let plan = LogicalPlanBuilder::scan(
68 TableReference::Bare {
69 table: format!("{tbl_func_name}()").into(),
70 },
71 provider,
72 None,
73 )?
74 .build()?;
75 (plan, alias)
76 } else {
77 let table_ref = self.object_name_to_table_reference(name)?;
79 let table_name = table_ref.to_string();
80 let cte = planner_context.get_cte(&table_name);
81 (
82 match (
83 cte,
84 self.context_provider.get_table_source(table_ref.clone()),
85 ) {
86 (Some(cte_plan), _) => Ok(cte_plan.clone()),
87 (_, Ok(provider)) => LogicalPlanBuilder::scan(
88 table_ref.clone(),
89 provider,
90 None,
91 )?
92 .build(),
93 (None, Err(e)) => {
94 let e = e.with_diagnostic(Diagnostic::new_error(
95 format!("table '{table_ref}' not found"),
96 Span::try_from_sqlparser_span(relation_span),
97 ));
98 Err(e)
99 }
100 }?,
101 alias,
102 )
103 }
104 }
105 TableFactor::Derived {
106 subquery, alias, ..
107 } => {
108 let logical_plan = self.query_to_plan(*subquery, planner_context)?;
109 (logical_plan, alias)
110 }
111 TableFactor::NestedJoin {
112 table_with_joins,
113 alias,
114 } => (
115 self.plan_table_with_joins(*table_with_joins, planner_context)?,
116 alias,
117 ),
118 TableFactor::UNNEST {
119 alias,
120 array_exprs,
121 with_offset: false,
122 with_offset_alias: None,
123 with_ordinality,
124 } => {
125 if with_ordinality {
126 return not_impl_err!("UNNEST with ordinality is not supported yet");
127 }
128
129 let schema = DFSchema::empty();
131 let input = LogicalPlanBuilder::empty(true).build()?;
132 let unnest_exprs = array_exprs
135 .into_iter()
136 .map(|sql_expr| {
137 let expr = self.sql_expr_to_logical_expr(
138 sql_expr,
139 &schema,
140 planner_context,
141 )?;
142 Self::check_unnest_arg(&expr, &schema)?;
143 Ok(Expr::Unnest(Unnest::new(expr)))
144 })
145 .collect::<Result<Vec<_>>>()?;
146 if unnest_exprs.is_empty() {
147 return plan_err!("UNNEST must have at least one argument");
148 }
149 let logical_plan = self.try_process_unnest(input, unnest_exprs)?;
150 (logical_plan, alias)
151 }
152 TableFactor::UNNEST { .. } => {
153 return not_impl_err!(
154 "UNNEST table factor with offset is not supported yet"
155 );
156 }
157 TableFactor::Function {
158 name, args, alias, ..
159 } => {
160 let tbl_func_ref = self.object_name_to_table_reference(name)?;
161 let schema = planner_context
162 .outer_query_schema()
163 .cloned()
164 .unwrap_or_else(DFSchema::empty);
165 let func_args = args
166 .into_iter()
167 .map(|arg| match arg {
168 FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))
169 | FunctionArg::Named {
170 arg: FunctionArgExpr::Expr(expr),
171 ..
172 } => {
173 self.sql_expr_to_logical_expr(expr, &schema, planner_context)
174 }
175 _ => plan_err!("Unsupported function argument: {arg:?}"),
176 })
177 .collect::<Result<Vec<Expr>>>()?;
178 let provider = self
179 .context_provider
180 .get_table_function_source(tbl_func_ref.table(), func_args)?;
181 let plan =
182 LogicalPlanBuilder::scan(tbl_func_ref.table(), provider, None)?
183 .build()?;
184 (plan, alias)
185 }
186 _ => {
188 return not_impl_err!(
189 "Unsupported ast node {relation:?} in create_relation"
190 );
191 }
192 };
193
194 let optimized_plan = optimize_subquery_sort(plan)?.data;
195 if let Some(alias) = alias {
196 self.apply_table_alias(optimized_plan, alias)
197 } else {
198 Ok(optimized_plan)
199 }
200 }
201
202 pub(crate) fn create_relation_subquery(
203 &self,
204 subquery: TableFactor,
205 planner_context: &mut PlannerContext,
206 ) -> Result<LogicalPlan> {
207 let old_from_schema = planner_context
214 .set_outer_from_schema(None)
215 .unwrap_or_else(|| Arc::new(DFSchema::empty()));
216 let new_query_schema = match planner_context.outer_query_schema() {
217 Some(old_query_schema) => {
218 let mut new_query_schema = old_from_schema.as_ref().clone();
219 new_query_schema.merge(old_query_schema);
220 Some(Arc::new(new_query_schema))
221 }
222 None => Some(Arc::clone(&old_from_schema)),
223 };
224 let old_query_schema = planner_context.set_outer_query_schema(new_query_schema);
225
226 let plan = self.create_relation(subquery, planner_context)?;
227 let outer_ref_columns = plan.all_out_ref_exprs();
228
229 planner_context.set_outer_query_schema(old_query_schema);
230 planner_context.set_outer_from_schema(Some(old_from_schema));
231
232 if outer_ref_columns.is_empty() {
235 return Ok(plan);
236 }
237
238 match plan {
239 LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
240 subquery_alias(
241 LogicalPlan::Subquery(Subquery {
242 subquery: input,
243 outer_ref_columns,
244 spans: Spans::new(),
245 }),
246 alias,
247 )
248 }
249 plan => Ok(LogicalPlan::Subquery(Subquery {
250 subquery: Arc::new(plan),
251 outer_ref_columns,
252 spans: Spans::new(),
253 })),
254 }
255 }
256}
257
258fn optimize_subquery_sort(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
259 let mut has_limit = false;
266 let new_plan = plan.transform_down(|c| {
267 if let LogicalPlan::Limit(_) = c {
268 has_limit = true;
269 return Ok(Transformed::no(c));
270 }
271 match c {
272 LogicalPlan::Sort(s) => {
273 if !has_limit {
274 has_limit = false;
275 return Ok(Transformed::yes(s.input.as_ref().clone()));
276 }
277 Ok(Transformed::no(LogicalPlan::Sort(s)))
278 }
279 _ => Ok(Transformed::no(c)),
280 }
281 });
282 new_plan
283}