1use super::{
19 ast::{
20 BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
21 SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
22 },
23 rewrite::{
24 inject_column_aliases_into_subquery, normalize_union_schema,
25 rewrite_plan_for_sort_on_non_projected_fields,
26 subquery_alias_inner_query_and_columns, TableAliasRewriter,
27 },
28 utils::{
29 find_agg_node_within_select, find_unnest_node_within_select,
30 find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
31 unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
32 },
33 Unparser,
34};
35use crate::unparser::extension_unparser::{
36 UnparseToStatementResult, UnparseWithinStatementResult,
37};
38use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
39use crate::unparser::{ast::UnnestRelationBuilder, rewrite::rewrite_qualify};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42 internal_err, not_impl_err,
43 tree_node::TransformedResult,
44 Column, DataFusionError, Result, ScalarValue, TableReference,
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48 expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49 LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50 UserDefinedLogicalNode,
51};
52use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
53use std::{sync::Arc, vec};
54
55pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
92 let unparser = Unparser::default();
93 unparser.plan_to_sql(plan)
94}
95
96impl Unparser<'_> {
97 pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
98 let mut plan = normalize_union_schema(plan)?;
99 if !self.dialect.supports_qualify() {
100 plan = rewrite_qualify(plan)?;
101 }
102
103 match plan {
104 LogicalPlan::Projection(_)
105 | LogicalPlan::Filter(_)
106 | LogicalPlan::Window(_)
107 | LogicalPlan::Aggregate(_)
108 | LogicalPlan::Sort(_)
109 | LogicalPlan::Join(_)
110 | LogicalPlan::Repartition(_)
111 | LogicalPlan::Union(_)
112 | LogicalPlan::TableScan(_)
113 | LogicalPlan::EmptyRelation(_)
114 | LogicalPlan::Subquery(_)
115 | LogicalPlan::SubqueryAlias(_)
116 | LogicalPlan::Limit(_)
117 | LogicalPlan::Statement(_)
118 | LogicalPlan::Values(_)
119 | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
120 LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
121 LogicalPlan::Extension(extension) => {
122 self.extension_to_statement(extension.node.as_ref())
123 }
124 LogicalPlan::Explain(_)
125 | LogicalPlan::Analyze(_)
126 | LogicalPlan::Ddl(_)
127 | LogicalPlan::Copy(_)
128 | LogicalPlan::DescribeTable(_)
129 | LogicalPlan::RecursiveQuery(_)
130 | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
131 }
132 }
133
134 fn extension_to_statement(
138 &self,
139 node: &dyn UserDefinedLogicalNode,
140 ) -> Result<ast::Statement> {
141 let mut statement = None;
142 for unparser in &self.extension_unparsers {
143 match unparser.unparse_to_statement(node, self)? {
144 UnparseToStatementResult::Modified(stmt) => {
145 statement = Some(stmt);
146 break;
147 }
148 UnparseToStatementResult::Unmodified => {}
149 }
150 }
151 if let Some(statement) = statement {
152 Ok(statement)
153 } else {
154 not_impl_err!("Unsupported extension node: {node:?}")
155 }
156 }
157
158 fn extension_to_sql(
162 &self,
163 node: &dyn UserDefinedLogicalNode,
164 query: &mut Option<&mut QueryBuilder>,
165 select: &mut Option<&mut SelectBuilder>,
166 relation: &mut Option<&mut RelationBuilder>,
167 ) -> Result<()> {
168 for unparser in &self.extension_unparsers {
169 match unparser.unparse(node, self, query, select, relation)? {
170 UnparseWithinStatementResult::Modified => return Ok(()),
171 UnparseWithinStatementResult::Unmodified => {}
172 }
173 }
174 not_impl_err!("Unsupported extension node: {node:?}")
175 }
176
177 fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
178 let mut query_builder = Some(QueryBuilder::default());
179
180 let body = self.select_to_sql_expr(plan, &mut query_builder)?;
181
182 let query = query_builder.unwrap().body(Box::new(body)).build()?;
183
184 Ok(ast::Statement::Query(Box::new(query)))
185 }
186
187 fn select_to_sql_expr(
188 &self,
189 plan: &LogicalPlan,
190 query: &mut Option<QueryBuilder>,
191 ) -> Result<SetExpr> {
192 let mut select_builder = SelectBuilder::default();
193 select_builder.push_from(TableWithJoinsBuilder::default());
194 let mut relation_builder = RelationBuilder::default();
195 self.select_to_sql_recursively(
196 plan,
197 query,
198 &mut select_builder,
199 &mut relation_builder,
200 )?;
201
202 if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
204 return Ok(*body);
205 }
206
207 if !select_builder.already_projected() {
210 select_builder.projection(vec![ast::SelectItem::Wildcard(
211 ast::WildcardAdditionalOptions::default(),
212 )]);
213 }
214
215 let mut twj = select_builder.pop_from().unwrap();
216 twj.relation(relation_builder);
217 select_builder.push_from(twj);
218
219 Ok(SetExpr::Select(Box::new(select_builder.build()?)))
220 }
221
222 fn reconstruct_select_statement(
226 &self,
227 plan: &LogicalPlan,
228 p: &Projection,
229 select: &mut SelectBuilder,
230 ) -> Result<()> {
231 let mut exprs = p.expr.clone();
232
233 if let Some(unnest) = find_unnest_node_within_select(plan) {
235 exprs = exprs
236 .into_iter()
237 .map(|e| unproject_unnest_expr(e, unnest))
238 .collect::<Result<Vec<_>>>()?;
239 };
240
241 match (
242 find_agg_node_within_select(plan, true),
243 find_window_nodes_within_select(plan, None, true),
244 ) {
245 (Some(agg), window) => {
246 let window_option = window.as_deref();
247 let items = exprs
248 .into_iter()
249 .map(|proj_expr| {
250 let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
251 self.select_item_to_sql(&unproj)
252 })
253 .collect::<Result<Vec<_>>>()?;
254
255 select.projection(items);
256 select.group_by(ast::GroupByExpr::Expressions(
257 agg.group_expr
258 .iter()
259 .map(|expr| self.expr_to_sql(expr))
260 .collect::<Result<Vec<_>>>()?,
261 vec![],
262 ));
263 }
264 (None, Some(window)) => {
265 let items = exprs
266 .into_iter()
267 .map(|proj_expr| {
268 let unproj = unproject_window_exprs(proj_expr, &window)?;
269 self.select_item_to_sql(&unproj)
270 })
271 .collect::<Result<Vec<_>>>()?;
272
273 select.projection(items);
274 }
275 _ => {
276 let items = exprs
277 .iter()
278 .map(|e| self.select_item_to_sql(e))
279 .collect::<Result<Vec<_>>>()?;
280 select.projection(items);
281 }
282 }
283 Ok(())
284 }
285
286 fn derive(
287 &self,
288 plan: &LogicalPlan,
289 relation: &mut RelationBuilder,
290 alias: Option<ast::TableAlias>,
291 lateral: bool,
292 ) -> Result<()> {
293 let mut derived_builder = DerivedRelationBuilder::default();
294 derived_builder.lateral(lateral).alias(alias).subquery({
295 let inner_statement = self.plan_to_sql(plan)?;
296 if let ast::Statement::Query(inner_query) = inner_statement {
297 inner_query
298 } else {
299 return internal_err!(
300 "Subquery must be a Query, but found {inner_statement:?}"
301 );
302 }
303 });
304 relation.derived(derived_builder);
305
306 Ok(())
307 }
308
309 fn derive_with_dialect_alias(
310 &self,
311 alias: &str,
312 plan: &LogicalPlan,
313 relation: &mut RelationBuilder,
314 lateral: bool,
315 columns: Vec<Ident>,
316 ) -> Result<()> {
317 if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
318 self.derive(
319 plan,
320 relation,
321 Some(self.new_table_alias(alias.to_string(), columns)),
322 lateral,
323 )
324 } else {
325 self.derive(plan, relation, None, lateral)
326 }
327 }
328
329 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
330 fn select_to_sql_recursively(
331 &self,
332 plan: &LogicalPlan,
333 query: &mut Option<QueryBuilder>,
334 select: &mut SelectBuilder,
335 relation: &mut RelationBuilder,
336 ) -> Result<()> {
337 match plan {
338 LogicalPlan::TableScan(scan) => {
339 if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
340 plan,
341 None,
342 select.already_projected(),
343 )? {
344 return self.select_to_sql_recursively(
345 &unparsed_table_scan,
346 query,
347 select,
348 relation,
349 );
350 }
351 let mut builder = TableRelationBuilder::default();
352 let mut table_parts = vec![];
353 if let Some(catalog_name) = scan.table_name.catalog() {
354 table_parts
355 .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
356 }
357 if let Some(schema_name) = scan.table_name.schema() {
358 table_parts
359 .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
360 }
361 table_parts.push(
362 self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
363 );
364 builder.name(ast::ObjectName::from(table_parts));
365 relation.table(builder);
366
367 Ok(())
368 }
369 LogicalPlan::Projection(p) => {
370 if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
371 return self
372 .select_to_sql_recursively(&new_plan, query, select, relation);
373 }
374
375 let unnest_input_type = if p.expr.len() == 1 {
379 Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
380 } else {
381 None
382 };
383 if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
384 if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
385 if let Some(unnest_relation) =
386 self.try_unnest_to_table_factor_sql(unnest)?
387 {
388 relation.unnest(unnest_relation);
389 return self.select_to_sql_recursively(
390 p.input.as_ref(),
391 query,
392 select,
393 relation,
394 );
395 }
396 }
397 }
398
399 let columns = if unnest_input_type.is_some() {
402 p.expr
403 .iter()
404 .map(|e| {
405 self.new_ident_quoted_if_needs(e.schema_name().to_string())
406 })
407 .collect()
408 } else {
409 vec![]
410 };
411 if select.already_projected() {
413 return self.derive_with_dialect_alias(
414 "derived_projection",
415 plan,
416 relation,
417 unnest_input_type
418 .filter(|t| matches!(t, UnnestInputType::OuterReference))
419 .is_some(),
420 columns,
421 );
422 }
423 self.reconstruct_select_statement(plan, p, select)?;
424 self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
425 }
426 LogicalPlan::Filter(filter) => {
427 if let Some(agg) =
428 find_agg_node_within_select(plan, select.already_projected())
429 {
430 let unprojected =
431 unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
432 let filter_expr = self.expr_to_sql(&unprojected)?;
433 select.having(Some(filter_expr));
434 } else if let (Some(window), true) = (
435 find_window_nodes_within_select(
436 plan,
437 None,
438 select.already_projected(),
439 ),
440 self.dialect.supports_qualify(),
441 ) {
442 let unprojected =
443 unproject_window_exprs(filter.predicate.clone(), &window)?;
444 let filter_expr = self.expr_to_sql(&unprojected)?;
445 select.qualify(Some(filter_expr));
446 } else {
447 let filter_expr = self.expr_to_sql(&filter.predicate)?;
448 select.selection(Some(filter_expr));
449 }
450
451 self.select_to_sql_recursively(
452 filter.input.as_ref(),
453 query,
454 select,
455 relation,
456 )
457 }
458 LogicalPlan::Limit(limit) => {
459 if select.already_projected() {
461 return self.derive_with_dialect_alias(
462 "derived_limit",
463 plan,
464 relation,
465 false,
466 vec![],
467 );
468 }
469 if let Some(fetch) = &limit.fetch {
470 let Some(query) = query.as_mut() else {
471 return internal_err!(
472 "Limit operator only valid in a statement context."
473 );
474 };
475 query.limit(Some(self.expr_to_sql(fetch)?));
476 }
477
478 if let Some(skip) = &limit.skip {
479 let Some(query) = query.as_mut() else {
480 return internal_err!(
481 "Offset operator only valid in a statement context."
482 );
483 };
484
485 query.offset(Some(ast::Offset {
486 rows: ast::OffsetRows::None,
487 value: self.expr_to_sql(skip)?,
488 }));
489 }
490
491 self.select_to_sql_recursively(
492 limit.input.as_ref(),
493 query,
494 select,
495 relation,
496 )
497 }
498 LogicalPlan::Sort(sort) => {
499 if select.already_projected() {
501 return self.derive_with_dialect_alias(
502 "derived_sort",
503 plan,
504 relation,
505 false,
506 vec![],
507 );
508 }
509 let Some(query_ref) = query else {
510 return internal_err!(
511 "Sort operator only valid in a statement context."
512 );
513 };
514
515 if let Some(fetch) = sort.fetch {
516 query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
517 fetch.to_string(),
518 false,
519 ))));
520 };
521
522 let agg = find_agg_node_within_select(plan, select.already_projected());
523 let sort_exprs: Vec<SortExpr> = sort
525 .expr
526 .iter()
527 .map(|sort_expr| {
528 unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
529 })
530 .collect::<Result<Vec<_>>>()?;
531
532 query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
533
534 self.select_to_sql_recursively(
535 sort.input.as_ref(),
536 query,
537 select,
538 relation,
539 )
540 }
541 LogicalPlan::Aggregate(agg) => {
542 if !select.already_projected() {
544 let exprs: Vec<_> = agg
547 .aggr_expr
548 .iter()
549 .chain(agg.group_expr.iter())
550 .map(|expr| self.select_item_to_sql(expr))
551 .collect::<Result<Vec<_>>>()?;
552 select.projection(exprs);
553
554 select.group_by(ast::GroupByExpr::Expressions(
555 agg.group_expr
556 .iter()
557 .map(|expr| self.expr_to_sql(expr))
558 .collect::<Result<Vec<_>>>()?,
559 vec![],
560 ));
561 }
562
563 self.select_to_sql_recursively(
564 agg.input.as_ref(),
565 query,
566 select,
567 relation,
568 )
569 }
570 LogicalPlan::Distinct(distinct) => {
571 if select.already_projected() {
573 return self.derive_with_dialect_alias(
574 "derived_distinct",
575 plan,
576 relation,
577 false,
578 vec![],
579 );
580 }
581
582 if let Distinct::All(input) = distinct {
585 if matches!(input.as_ref(), LogicalPlan::Union(_)) {
586 if let Some(query_mut) = query.as_mut() {
587 query_mut.distinct_union();
588 return self.select_to_sql_recursively(
589 input.as_ref(),
590 query,
591 select,
592 relation,
593 );
594 }
595 }
596 }
597
598 let (select_distinct, input) = match distinct {
599 Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
600 Distinct::On(on) => {
601 let exprs = on
602 .on_expr
603 .iter()
604 .map(|e| self.expr_to_sql(e))
605 .collect::<Result<Vec<_>>>()?;
606 let items = on
607 .select_expr
608 .iter()
609 .map(|e| self.select_item_to_sql(e))
610 .collect::<Result<Vec<_>>>()?;
611 if let Some(sort_expr) = &on.sort_expr {
612 if let Some(query_ref) = query {
613 query_ref.order_by(self.sorts_to_sql(sort_expr)?);
614 } else {
615 return internal_err!(
616 "Sort operator only valid in a statement context."
617 );
618 }
619 }
620 select.projection(items);
621 (ast::Distinct::On(exprs), on.input.as_ref())
622 }
623 };
624 select.distinct(Some(select_distinct));
625 self.select_to_sql_recursively(input, query, select, relation)
626 }
627 LogicalPlan::Join(join) => {
628 let mut table_scan_filters = vec![];
629 let (left_plan, right_plan) = match join.join_type {
630 JoinType::RightSemi | JoinType::RightAnti => {
631 (&join.right, &join.left)
632 }
633 _ => (&join.left, &join.right),
634 };
635 let already_projected = select.already_projected();
639
640 let left_plan =
641 match try_transform_to_simple_table_scan_with_filters(left_plan)? {
642 Some((plan, filters)) => {
643 table_scan_filters.extend(filters);
644 Arc::new(plan)
645 }
646 None => Arc::clone(left_plan),
647 };
648
649 self.select_to_sql_recursively(
650 left_plan.as_ref(),
651 query,
652 select,
653 relation,
654 )?;
655
656 let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
657 {
658 Some(select.pop_projections())
659 } else {
660 None
661 };
662
663 let right_plan =
664 match try_transform_to_simple_table_scan_with_filters(right_plan)? {
665 Some((plan, filters)) => {
666 table_scan_filters.extend(filters);
667 Arc::new(plan)
668 }
669 None => Arc::clone(right_plan),
670 };
671
672 let mut right_relation = RelationBuilder::default();
673
674 self.select_to_sql_recursively(
675 right_plan.as_ref(),
676 query,
677 select,
678 &mut right_relation,
679 )?;
680
681 let join_filters = if table_scan_filters.is_empty() {
682 join.filter.clone()
683 } else {
684 let Some(combined_filters) =
686 table_scan_filters.into_iter().reduce(|acc, filter| {
687 Expr::BinaryExpr(BinaryExpr {
688 left: Box::new(acc),
689 op: Operator::And,
690 right: Box::new(filter),
691 })
692 })
693 else {
694 return internal_err!("Failed to combine TableScan filters");
695 };
696
697 match &join.filter {
699 Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
700 left: Box::new(filter.clone()),
701 op: Operator::And,
702 right: Box::new(combined_filters),
703 })),
704 None => Some(combined_filters),
705 }
706 };
707
708 let join_constraint = self.join_constraint_to_sql(
709 join.join_constraint,
710 &join.on,
711 join_filters.as_ref(),
712 )?;
713
714 let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
715 {
716 Some(select.pop_projections())
717 } else {
718 None
719 };
720
721 match join.join_type {
722 JoinType::LeftSemi
723 | JoinType::LeftAnti
724 | JoinType::LeftMark
725 | JoinType::RightSemi
726 | JoinType::RightAnti
727 | JoinType::RightMark => {
728 let mut query_builder = QueryBuilder::default();
729 let mut from = TableWithJoinsBuilder::default();
730 let mut exists_select: SelectBuilder = SelectBuilder::default();
731 from.relation(right_relation);
732 exists_select.push_from(from);
733 if let Some(filter) = &join.filter {
734 exists_select.selection(Some(self.expr_to_sql(filter)?));
735 }
736 for (left, right) in &join.on {
737 exists_select.selection(Some(
738 self.expr_to_sql(&left.clone().eq(right.clone()))?,
739 ));
740 }
741 exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
742 ast::Expr::value(ast::Value::Number("1".to_string(), false)),
743 )]);
744 query_builder.body(Box::new(SetExpr::Select(Box::new(
745 exists_select.build()?,
746 ))));
747
748 let negated = match join.join_type {
749 JoinType::LeftSemi
750 | JoinType::RightSemi
751 | JoinType::LeftMark
752 | JoinType::RightMark => false,
753 JoinType::LeftAnti | JoinType::RightAnti => true,
754 _ => unreachable!(),
755 };
756 let exists_expr = ast::Expr::Exists {
757 subquery: Box::new(query_builder.build()?),
758 negated,
759 };
760
761 match join.join_type {
762 JoinType::LeftMark | JoinType::RightMark => {
763 let source_schema =
764 if join.join_type == JoinType::LeftMark {
765 right_plan.schema()
766 } else {
767 left_plan.schema()
768 };
769 let (table_ref, _) = source_schema.qualified_field(0);
770 let column = self.col_to_sql(&Column::new(
771 table_ref.cloned(),
772 "mark",
773 ))?;
774 select.replace_mark(&column, &exists_expr);
775 }
776 _ => {
777 select.selection(Some(exists_expr));
778 }
779 }
780 if let Some(projection) = left_projection {
781 select.projection(projection);
782 }
783 }
784 JoinType::Inner
785 | JoinType::Left
786 | JoinType::Right
787 | JoinType::Full => {
788 let Ok(Some(relation)) = right_relation.build() else {
789 return internal_err!("Failed to build right relation");
790 };
791 let ast_join = ast::Join {
792 relation,
793 global: false,
794 join_operator: self
795 .join_operator_to_sql(join.join_type, join_constraint)?,
796 };
797 let mut from = select.pop_from().unwrap();
798 from.push_join(ast_join);
799 select.push_from(from);
800 if !already_projected {
801 let Some(left_projection) = left_projection else {
802 return internal_err!("Left projection is missing");
803 };
804
805 let Some(right_projection) = right_projection else {
806 return internal_err!("Right projection is missing");
807 };
808
809 let projection = left_projection
810 .into_iter()
811 .chain(right_projection)
812 .collect();
813 select.projection(projection);
814 }
815 }
816 };
817
818 Ok(())
819 }
820 LogicalPlan::SubqueryAlias(plan_alias) => {
821 let (plan, mut columns) =
822 subquery_alias_inner_query_and_columns(plan_alias);
823 let unparsed_table_scan = Self::unparse_table_scan_pushdown(
824 plan,
825 Some(plan_alias.alias.clone()),
826 select.already_projected(),
827 )?;
828 if !select.already_projected() && unparsed_table_scan.is_none() {
831 select.projection(vec![ast::SelectItem::Wildcard(
832 ast::WildcardAdditionalOptions::default(),
833 )]);
834 }
835 let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
836 if !columns.is_empty()
837 && !self.dialect.supports_column_alias_in_table_alias()
838 {
839 let rewritten_plan =
841 match inject_column_aliases_into_subquery(plan, columns) {
842 Ok(p) => p,
843 Err(e) => {
844 return internal_err!(
845 "Failed to transform SubqueryAlias plan: {e}"
846 )
847 }
848 };
849
850 columns = vec![];
851
852 self.select_to_sql_recursively(
853 &rewritten_plan,
854 query,
855 select,
856 relation,
857 )?;
858 } else {
859 self.select_to_sql_recursively(&plan, query, select, relation)?;
860 }
861
862 relation.alias(Some(
863 self.new_table_alias(plan_alias.alias.table().to_string(), columns),
864 ));
865
866 Ok(())
867 }
868 LogicalPlan::Union(union) => {
869 if select.already_projected() {
871 return self.derive_with_dialect_alias(
872 "derived_union",
873 plan,
874 relation,
875 false,
876 vec![],
877 );
878 }
879
880 let input_exprs: Vec<SetExpr> = union
881 .inputs
882 .iter()
883 .map(|input| self.select_to_sql_expr(input, query))
884 .collect::<Result<Vec<_>>>()?;
885
886 if input_exprs.len() < 2 {
887 return internal_err!("UNION operator requires at least 2 inputs");
888 }
889
890 let set_quantifier =
891 if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
892 ast::SetQuantifier::None
895 } else {
896 ast::SetQuantifier::All
897 };
898
899 let union_expr = input_exprs
902 .into_iter()
903 .rev()
904 .reduce(|a, b| SetExpr::SetOperation {
905 op: ast::SetOperator::Union,
906 set_quantifier,
907 left: Box::new(b),
908 right: Box::new(a),
909 })
910 .unwrap();
911
912 let Some(query) = query.as_mut() else {
913 return internal_err!(
914 "UNION ALL operator only valid in a statement context"
915 );
916 };
917 query.body(Box::new(union_expr));
918
919 Ok(())
920 }
921 LogicalPlan::Window(window) => {
922 self.select_to_sql_recursively(
924 window.input.as_ref(),
925 query,
926 select,
927 relation,
928 )
929 }
930 LogicalPlan::EmptyRelation(_) => {
931 if !relation.has_relation() {
934 relation.empty();
935 }
936 Ok(())
937 }
938 LogicalPlan::Extension(extension) => {
939 if let Some(query) = query.as_mut() {
940 self.extension_to_sql(
941 extension.node.as_ref(),
942 &mut Some(query),
943 &mut Some(select),
944 &mut Some(relation),
945 )
946 } else {
947 self.extension_to_sql(
948 extension.node.as_ref(),
949 &mut None,
950 &mut Some(select),
951 &mut Some(relation),
952 )
953 }
954 }
955 LogicalPlan::Unnest(unnest) => {
956 if !unnest.struct_type_columns.is_empty() {
957 return internal_err!(
958 "Struct type columns are not currently supported in UNNEST: {:?}",
959 unnest.struct_type_columns
960 );
961 }
962
963 if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
971 self.select_to_sql_recursively(&p.input, query, select, relation)
973 } else {
974 internal_err!("Unnest input is not a Projection: {unnest:?}")
975 }
976 }
977 LogicalPlan::Subquery(subquery)
978 if find_unnest_node_until_relation(subquery.subquery.as_ref())
979 .is_some() =>
980 {
981 if self.dialect.unnest_as_table_factor() {
982 self.select_to_sql_recursively(
983 subquery.subquery.as_ref(),
984 query,
985 select,
986 relation,
987 )
988 } else {
989 self.derive_with_dialect_alias(
990 "derived_unnest",
991 subquery.subquery.as_ref(),
992 relation,
993 true,
994 vec![],
995 )
996 }
997 }
998 _ => {
999 not_impl_err!("Unsupported operator: {plan:?}")
1000 }
1001 }
1002 }
1003
1004 fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
1014 if let Expr::Alias(Alias { expr, .. }) = expr {
1015 if let Expr::Column(Column { name, .. }) = expr.as_ref() {
1016 if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
1017 if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1018 return Some(UnnestInputType::OuterReference);
1019 }
1020 return Some(UnnestInputType::Scalar);
1021 }
1022 }
1023 }
1024 None
1025 }
1026
1027 fn try_unnest_to_table_factor_sql(
1028 &self,
1029 unnest: &Unnest,
1030 ) -> Result<Option<UnnestRelationBuilder>> {
1031 let mut unnest_relation = UnnestRelationBuilder::default();
1032 let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1033 return Ok(None);
1034 };
1035
1036 if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1037 return Ok(None);
1045 };
1046
1047 let exprs = projection
1048 .expr
1049 .iter()
1050 .map(|e| self.expr_to_sql(e))
1051 .collect::<Result<Vec<_>>>()?;
1052 unnest_relation.array_exprs(exprs);
1053
1054 Ok(Some(unnest_relation))
1055 }
1056
1057 fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1058 scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1059 }
1060
1061 fn unparse_table_scan_pushdown(
1064 plan: &LogicalPlan,
1065 alias: Option<TableReference>,
1066 already_projected: bool,
1067 ) -> Result<Option<LogicalPlan>> {
1068 match plan {
1069 LogicalPlan::TableScan(table_scan) => {
1070 if !Self::is_scan_with_pushdown(table_scan) {
1071 return Ok(None);
1072 }
1073 let table_schema = table_scan.source.schema();
1074 let mut filter_alias_rewriter =
1075 alias.as_ref().map(|alias_name| TableAliasRewriter {
1076 table_schema: &table_schema,
1077 alias_name: alias_name.clone(),
1078 });
1079
1080 let mut builder = LogicalPlanBuilder::scan(
1081 table_scan.table_name.clone(),
1082 Arc::clone(&table_scan.source),
1083 None,
1084 )?;
1085 if let Some(ref alias) = alias {
1091 if table_scan.projection.is_some() || !table_scan.filters.is_empty() {
1092 builder = builder.alias(alias.clone())?;
1093 }
1094 }
1095
1096 if !already_projected {
1100 if let Some(project_vec) = &table_scan.projection {
1101 if project_vec.is_empty() {
1102 builder = builder.project(vec![Expr::Literal(
1103 ScalarValue::Int64(Some(1)),
1104 None,
1105 )])?;
1106 } else {
1107 let project_columns = project_vec
1108 .iter()
1109 .cloned()
1110 .map(|i| {
1111 let schema = table_scan.source.schema();
1112 let field = schema.field(i);
1113 if alias.is_some() {
1114 Column::new(alias.clone(), field.name().clone())
1115 } else {
1116 Column::new(
1117 Some(table_scan.table_name.clone()),
1118 field.name().clone(),
1119 )
1120 }
1121 })
1122 .collect::<Vec<_>>();
1123 builder = builder.project(project_columns)?;
1124 };
1125 }
1126 }
1127
1128 let filter_expr: Result<Option<Expr>> = table_scan
1129 .filters
1130 .iter()
1131 .cloned()
1132 .map(|expr| {
1133 if let Some(ref mut rewriter) = filter_alias_rewriter {
1134 expr.rewrite_with_lambdas_params(rewriter).data()
1135 } else {
1136 Ok(expr)
1137 }
1138 })
1139 .reduce(|acc, expr_result| {
1140 acc.and_then(|acc_expr| {
1141 expr_result.map(|expr| acc_expr.and(expr))
1142 })
1143 })
1144 .transpose();
1145
1146 if let Some(filter) = filter_expr? {
1147 builder = builder.filter(filter)?;
1148 }
1149
1150 if let Some(fetch) = table_scan.fetch {
1151 builder = builder.limit(0, Some(fetch))?;
1152 }
1153
1154 if let Some(alias) = alias {
1159 if table_scan.projection.is_none() && table_scan.filters.is_empty() {
1160 builder = builder.alias(alias)?;
1161 }
1162 }
1163
1164 Ok(Some(builder.build()?))
1165 }
1166 LogicalPlan::SubqueryAlias(subquery_alias) => {
1167 let ret = Self::unparse_table_scan_pushdown(
1168 &subquery_alias.input,
1169 Some(subquery_alias.alias.clone()),
1170 already_projected,
1171 )?;
1172 if let Some(alias) = alias {
1173 if let Some(plan) = ret {
1174 let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1175 return Ok(Some(plan));
1176 }
1177 }
1178 Ok(ret)
1179 }
1180 LogicalPlan::Projection(projection) => {
1183 if let Some(plan) = Self::unparse_table_scan_pushdown(
1184 &projection.input,
1185 alias.clone(),
1186 already_projected,
1187 )? {
1188 let exprs = if alias.is_some() {
1189 let mut alias_rewriter =
1190 alias.as_ref().map(|alias_name| TableAliasRewriter {
1191 table_schema: plan.schema().as_arrow(),
1192 alias_name: alias_name.clone(),
1193 });
1194 projection
1195 .expr
1196 .iter()
1197 .cloned()
1198 .map(|expr| {
1199 if let Some(ref mut rewriter) = alias_rewriter {
1200 expr.rewrite_with_lambdas_params(rewriter).data()
1201 } else {
1202 Ok(expr)
1203 }
1204 })
1205 .collect::<Result<Vec<_>>>()?
1206 } else {
1207 projection.expr.clone()
1208 };
1209 Ok(Some(
1210 LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1211 ))
1212 } else {
1213 Ok(None)
1214 }
1215 }
1216 _ => Ok(None),
1217 }
1218 }
1219
1220 fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1221 match expr {
1222 Expr::Alias(Alias { expr, name, .. }) => {
1223 let inner = self.expr_to_sql(expr)?;
1224
1225 let col_name = if let Some(rewritten_name) =
1227 self.dialect.col_alias_overrides(name)?
1228 {
1229 rewritten_name.to_string()
1230 } else {
1231 name.to_string()
1232 };
1233
1234 Ok(ast::SelectItem::ExprWithAlias {
1235 expr: inner,
1236 alias: self.new_ident_quoted_if_needs(col_name),
1237 })
1238 }
1239 _ => {
1240 let inner = self.expr_to_sql(expr)?;
1241
1242 Ok(ast::SelectItem::UnnamedExpr(inner))
1243 }
1244 }
1245 }
1246
1247 fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1248 Ok(OrderByKind::Expressions(
1249 sort_exprs
1250 .iter()
1251 .map(|sort_expr| self.sort_to_sql(sort_expr))
1252 .collect::<Result<Vec<_>>>()?,
1253 ))
1254 }
1255
1256 fn join_operator_to_sql(
1257 &self,
1258 join_type: JoinType,
1259 constraint: ast::JoinConstraint,
1260 ) -> Result<ast::JoinOperator> {
1261 Ok(match join_type {
1262 JoinType::Inner => match &constraint {
1263 ast::JoinConstraint::On(_)
1264 | ast::JoinConstraint::Using(_)
1265 | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1266 ast::JoinConstraint::None => {
1267 ast::JoinOperator::CrossJoin(constraint)
1270 }
1271 },
1272 JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1273 JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1274 JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1275 JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1276 JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1277 JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1278 JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1279 JoinType::LeftMark | JoinType::RightMark => {
1280 unimplemented!("Unparsing of Mark join type")
1281 }
1282 })
1283 }
1284
1285 fn join_using_to_sql(
1289 &self,
1290 join_conditions: &[(Expr, Expr)],
1291 ) -> Option<ast::JoinConstraint> {
1292 let mut object_names = Vec::with_capacity(join_conditions.len());
1293 for (left, right) in join_conditions {
1294 match (left, right) {
1295 (
1296 Expr::Column(Column {
1297 relation: _,
1298 name: left_name,
1299 spans: _,
1300 }),
1301 Expr::Column(Column {
1302 relation: _,
1303 name: right_name,
1304 spans: _,
1305 }),
1306 ) if left_name == right_name => {
1307 let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1311 object_names.push(ast::ObjectName::from(vec![ident]));
1312 }
1313 _ => return None,
1316 }
1317 }
1318 Some(ast::JoinConstraint::Using(object_names))
1319 }
1320
1321 fn join_constraint_to_sql(
1323 &self,
1324 constraint: JoinConstraint,
1325 conditions: &[(Expr, Expr)],
1326 filter: Option<&Expr>,
1327 ) -> Result<ast::JoinConstraint> {
1328 match (constraint, conditions, filter) {
1329 (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1331 Ok(ast::JoinConstraint::None)
1332 }
1333
1334 (JoinConstraint::Using, conditions, None) => {
1335 match self.join_using_to_sql(conditions) {
1336 Some(using) => Ok(using),
1337 None => self.join_conditions_to_sql_on(conditions, None),
1340 }
1341 }
1342
1343 (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1351 self.join_conditions_to_sql_on(conditions, filter)
1352 }
1353 }
1354 }
1355
1356 fn join_conditions_to_sql_on(
1360 &self,
1361 join_conditions: &[(Expr, Expr)],
1362 filter: Option<&Expr>,
1363 ) -> Result<ast::JoinConstraint> {
1364 let mut condition = None;
1365 for (left, right) in join_conditions {
1367 let l = self.expr_to_sql(left)?;
1369 let r = self.expr_to_sql(right)?;
1370 let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1371 condition = match condition {
1372 Some(expr) => Some(self.and_op_to_sql(expr, e)),
1373 None => Some(e),
1374 };
1375 }
1376
1377 condition = match (condition, filter) {
1379 (Some(expr), Some(filter)) => {
1380 Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1381 }
1382 (Some(expr), None) => Some(expr),
1383 (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1384 (None, None) => None,
1385 };
1386
1387 let constraint = match condition {
1388 Some(filter) => ast::JoinConstraint::On(filter),
1389 None => ast::JoinConstraint::None,
1390 };
1391
1392 Ok(constraint)
1393 }
1394
1395 fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1396 self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1397 }
1398
1399 fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1400 let columns = columns
1401 .into_iter()
1402 .map(|ident| TableAliasColumnDef {
1403 name: ident,
1404 data_type: None,
1405 })
1406 .collect();
1407 ast::TableAlias {
1408 name: self.new_ident_quoted_if_needs(alias),
1409 columns,
1410 }
1411 }
1412
1413 fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1414 not_impl_err!("Unsupported plan: {plan:?}")
1415 }
1416}
1417
1418impl From<BuilderError> for DataFusionError {
1419 fn from(e: BuilderError) -> Self {
1420 DataFusionError::External(Box::new(e))
1421 }
1422}
1423
1424#[derive(Debug)]
1426enum UnnestInputType {
1427 OuterReference,
1429 Scalar,
1431}