datafusion_sql/unparser/
plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{
19    ast::{
20        BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
21        SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
22    },
23    rewrite::{
24        inject_column_aliases_into_subquery, normalize_union_schema,
25        rewrite_plan_for_sort_on_non_projected_fields,
26        subquery_alias_inner_query_and_columns, TableAliasRewriter,
27    },
28    utils::{
29        find_agg_node_within_select, find_unnest_node_within_select,
30        find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
31        unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
32    },
33    Unparser,
34};
35use crate::unparser::extension_unparser::{
36    UnparseToStatementResult, UnparseWithinStatementResult,
37};
38use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
39use crate::unparser::{ast::UnnestRelationBuilder, rewrite::rewrite_qualify};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42    internal_err, not_impl_err,
43    tree_node::TransformedResult,
44    Column, DataFusionError, Result, ScalarValue, TableReference,
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48    expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49    LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50    UserDefinedLogicalNode,
51};
52use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
53use std::{sync::Arc, vec};
54
55/// Convert a DataFusion [`LogicalPlan`] to [`ast::Statement`]
56///
57/// This function is the opposite of [`SqlToRel::sql_statement_to_plan`] and can
58/// be used to, among other things, to convert `LogicalPlan`s to SQL strings.
59///
60/// # Errors
61///
62/// This function returns an error if the plan cannot be converted to SQL.
63///
64/// # See Also
65///
66/// * [`expr_to_sql`] for converting [`Expr`], a single expression to SQL
67///
68/// # Example
69/// ```
70/// use arrow::datatypes::{DataType, Field, Schema};
71/// use datafusion_expr::{col, logical_plan::table_scan};
72/// use datafusion_sql::unparser::plan_to_sql;
73/// let schema = Schema::new(vec![
74///     Field::new("id", DataType::Utf8, false),
75///     Field::new("value", DataType::Utf8, false),
76/// ]);
77/// // Scan 'table' and select columns 'id' and 'value'
78/// let plan = table_scan(Some("table"), &schema, None)
79///     .unwrap()
80///     .project(vec![col("id"), col("value")])
81///     .unwrap()
82///     .build()
83///     .unwrap();
84/// let sql = plan_to_sql(&plan).unwrap(); // convert to AST
85/// // use the Display impl to convert to SQL text
86/// assert_eq!(sql.to_string(), "SELECT \"table\".id, \"table\".\"value\" FROM \"table\"")
87/// ```
88///
89/// [`SqlToRel::sql_statement_to_plan`]: crate::planner::SqlToRel::sql_statement_to_plan
90/// [`expr_to_sql`]: crate::unparser::expr_to_sql
91pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
92    let unparser = Unparser::default();
93    unparser.plan_to_sql(plan)
94}
95
96impl Unparser<'_> {
97    pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
98        let mut plan = normalize_union_schema(plan)?;
99        if !self.dialect.supports_qualify() {
100            plan = rewrite_qualify(plan)?;
101        }
102
103        match plan {
104            LogicalPlan::Projection(_)
105            | LogicalPlan::Filter(_)
106            | LogicalPlan::Window(_)
107            | LogicalPlan::Aggregate(_)
108            | LogicalPlan::Sort(_)
109            | LogicalPlan::Join(_)
110            | LogicalPlan::Repartition(_)
111            | LogicalPlan::Union(_)
112            | LogicalPlan::TableScan(_)
113            | LogicalPlan::EmptyRelation(_)
114            | LogicalPlan::Subquery(_)
115            | LogicalPlan::SubqueryAlias(_)
116            | LogicalPlan::Limit(_)
117            | LogicalPlan::Statement(_)
118            | LogicalPlan::Values(_)
119            | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
120            LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
121            LogicalPlan::Extension(extension) => {
122                self.extension_to_statement(extension.node.as_ref())
123            }
124            LogicalPlan::Explain(_)
125            | LogicalPlan::Analyze(_)
126            | LogicalPlan::Ddl(_)
127            | LogicalPlan::Copy(_)
128            | LogicalPlan::DescribeTable(_)
129            | LogicalPlan::RecursiveQuery(_)
130            | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
131        }
132    }
133
134    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
135    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
136    /// the first unparsing result will be returned.
137    fn extension_to_statement(
138        &self,
139        node: &dyn UserDefinedLogicalNode,
140    ) -> Result<ast::Statement> {
141        let mut statement = None;
142        for unparser in &self.extension_unparsers {
143            match unparser.unparse_to_statement(node, self)? {
144                UnparseToStatementResult::Modified(stmt) => {
145                    statement = Some(stmt);
146                    break;
147                }
148                UnparseToStatementResult::Unmodified => {}
149            }
150        }
151        if let Some(statement) = statement {
152            Ok(statement)
153        } else {
154            not_impl_err!("Unsupported extension node: {node:?}")
155        }
156    }
157
158    /// Try to unparse a [UserDefinedLogicalNode] to a SQL statement.
159    /// If multiple unparsers are registered for the same [UserDefinedLogicalNode],
160    /// the first unparser supporting the node will be used.
161    fn extension_to_sql(
162        &self,
163        node: &dyn UserDefinedLogicalNode,
164        query: &mut Option<&mut QueryBuilder>,
165        select: &mut Option<&mut SelectBuilder>,
166        relation: &mut Option<&mut RelationBuilder>,
167    ) -> Result<()> {
168        for unparser in &self.extension_unparsers {
169            match unparser.unparse(node, self, query, select, relation)? {
170                UnparseWithinStatementResult::Modified => return Ok(()),
171                UnparseWithinStatementResult::Unmodified => {}
172            }
173        }
174        not_impl_err!("Unsupported extension node: {node:?}")
175    }
176
177    fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
178        let mut query_builder = Some(QueryBuilder::default());
179
180        let body = self.select_to_sql_expr(plan, &mut query_builder)?;
181
182        let query = query_builder.unwrap().body(Box::new(body)).build()?;
183
184        Ok(ast::Statement::Query(Box::new(query)))
185    }
186
187    fn select_to_sql_expr(
188        &self,
189        plan: &LogicalPlan,
190        query: &mut Option<QueryBuilder>,
191    ) -> Result<SetExpr> {
192        let mut select_builder = SelectBuilder::default();
193        select_builder.push_from(TableWithJoinsBuilder::default());
194        let mut relation_builder = RelationBuilder::default();
195        self.select_to_sql_recursively(
196            plan,
197            query,
198            &mut select_builder,
199            &mut relation_builder,
200        )?;
201
202        // If we were able to construct a full body (i.e. UNION ALL), return it
203        if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
204            return Ok(*body);
205        }
206
207        // If no projection is set, add a wildcard projection to the select
208        // which will be translated to `SELECT *` in the SQL statement
209        if !select_builder.already_projected() {
210            select_builder.projection(vec![ast::SelectItem::Wildcard(
211                ast::WildcardAdditionalOptions::default(),
212            )]);
213        }
214
215        let mut twj = select_builder.pop_from().unwrap();
216        twj.relation(relation_builder);
217        select_builder.push_from(twj);
218
219        Ok(SetExpr::Select(Box::new(select_builder.build()?)))
220    }
221
222    /// Reconstructs a SELECT SQL statement from a logical plan by unprojecting column expressions
223    /// found in a [Projection] node. This requires scanning the plan tree for relevant Aggregate
224    /// and Window nodes and matching column expressions to the appropriate agg or window expressions.
225    fn reconstruct_select_statement(
226        &self,
227        plan: &LogicalPlan,
228        p: &Projection,
229        select: &mut SelectBuilder,
230    ) -> Result<()> {
231        let mut exprs = p.expr.clone();
232
233        // If an Unnest node is found within the select, find and unproject the unnest column
234        if let Some(unnest) = find_unnest_node_within_select(plan) {
235            exprs = exprs
236                .into_iter()
237                .map(|e| unproject_unnest_expr(e, unnest))
238                .collect::<Result<Vec<_>>>()?;
239        };
240
241        match (
242            find_agg_node_within_select(plan, true),
243            find_window_nodes_within_select(plan, None, true),
244        ) {
245            (Some(agg), window) => {
246                let window_option = window.as_deref();
247                let items = exprs
248                    .into_iter()
249                    .map(|proj_expr| {
250                        let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
251                        self.select_item_to_sql(&unproj)
252                    })
253                    .collect::<Result<Vec<_>>>()?;
254
255                select.projection(items);
256                select.group_by(ast::GroupByExpr::Expressions(
257                    agg.group_expr
258                        .iter()
259                        .map(|expr| self.expr_to_sql(expr))
260                        .collect::<Result<Vec<_>>>()?,
261                    vec![],
262                ));
263            }
264            (None, Some(window)) => {
265                let items = exprs
266                    .into_iter()
267                    .map(|proj_expr| {
268                        let unproj = unproject_window_exprs(proj_expr, &window)?;
269                        self.select_item_to_sql(&unproj)
270                    })
271                    .collect::<Result<Vec<_>>>()?;
272
273                select.projection(items);
274            }
275            _ => {
276                let items = exprs
277                    .iter()
278                    .map(|e| self.select_item_to_sql(e))
279                    .collect::<Result<Vec<_>>>()?;
280                select.projection(items);
281            }
282        }
283        Ok(())
284    }
285
286    fn derive(
287        &self,
288        plan: &LogicalPlan,
289        relation: &mut RelationBuilder,
290        alias: Option<ast::TableAlias>,
291        lateral: bool,
292    ) -> Result<()> {
293        let mut derived_builder = DerivedRelationBuilder::default();
294        derived_builder.lateral(lateral).alias(alias).subquery({
295            let inner_statement = self.plan_to_sql(plan)?;
296            if let ast::Statement::Query(inner_query) = inner_statement {
297                inner_query
298            } else {
299                return internal_err!(
300                    "Subquery must be a Query, but found {inner_statement:?}"
301                );
302            }
303        });
304        relation.derived(derived_builder);
305
306        Ok(())
307    }
308
309    fn derive_with_dialect_alias(
310        &self,
311        alias: &str,
312        plan: &LogicalPlan,
313        relation: &mut RelationBuilder,
314        lateral: bool,
315        columns: Vec<Ident>,
316    ) -> Result<()> {
317        if self.dialect.requires_derived_table_alias() || !columns.is_empty() {
318            self.derive(
319                plan,
320                relation,
321                Some(self.new_table_alias(alias.to_string(), columns)),
322                lateral,
323            )
324        } else {
325            self.derive(plan, relation, None, lateral)
326        }
327    }
328
329    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
330    fn select_to_sql_recursively(
331        &self,
332        plan: &LogicalPlan,
333        query: &mut Option<QueryBuilder>,
334        select: &mut SelectBuilder,
335        relation: &mut RelationBuilder,
336    ) -> Result<()> {
337        match plan {
338            LogicalPlan::TableScan(scan) => {
339                if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
340                    plan,
341                    None,
342                    select.already_projected(),
343                )? {
344                    return self.select_to_sql_recursively(
345                        &unparsed_table_scan,
346                        query,
347                        select,
348                        relation,
349                    );
350                }
351                let mut builder = TableRelationBuilder::default();
352                let mut table_parts = vec![];
353                if let Some(catalog_name) = scan.table_name.catalog() {
354                    table_parts
355                        .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
356                }
357                if let Some(schema_name) = scan.table_name.schema() {
358                    table_parts
359                        .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
360                }
361                table_parts.push(
362                    self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
363                );
364                builder.name(ast::ObjectName::from(table_parts));
365                relation.table(builder);
366
367                Ok(())
368            }
369            LogicalPlan::Projection(p) => {
370                if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
371                    return self
372                        .select_to_sql_recursively(&new_plan, query, select, relation);
373                }
374
375                // Projection can be top-level plan for unnest relation
376                // The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
377                // only one expression, which is the placeholder column generated by the rewriter.
378                let unnest_input_type = if p.expr.len() == 1 {
379                    Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
380                } else {
381                    None
382                };
383                if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
384                    if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
385                        if let Some(unnest_relation) =
386                            self.try_unnest_to_table_factor_sql(unnest)?
387                        {
388                            relation.unnest(unnest_relation);
389                            return self.select_to_sql_recursively(
390                                p.input.as_ref(),
391                                query,
392                                select,
393                                relation,
394                            );
395                        }
396                    }
397                }
398
399                // If it's a unnest projection, we should provide the table column alias
400                // to provide a column name for the unnest relation.
401                let columns = if unnest_input_type.is_some() {
402                    p.expr
403                        .iter()
404                        .map(|e| {
405                            self.new_ident_quoted_if_needs(e.schema_name().to_string())
406                        })
407                        .collect()
408                } else {
409                    vec![]
410                };
411                // Projection can be top-level plan for derived table
412                if select.already_projected() {
413                    return self.derive_with_dialect_alias(
414                        "derived_projection",
415                        plan,
416                        relation,
417                        unnest_input_type
418                            .filter(|t| matches!(t, UnnestInputType::OuterReference))
419                            .is_some(),
420                        columns,
421                    );
422                }
423                self.reconstruct_select_statement(plan, p, select)?;
424                self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
425            }
426            LogicalPlan::Filter(filter) => {
427                if let Some(agg) =
428                    find_agg_node_within_select(plan, select.already_projected())
429                {
430                    let unprojected =
431                        unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
432                    let filter_expr = self.expr_to_sql(&unprojected)?;
433                    select.having(Some(filter_expr));
434                } else if let (Some(window), true) = (
435                    find_window_nodes_within_select(
436                        plan,
437                        None,
438                        select.already_projected(),
439                    ),
440                    self.dialect.supports_qualify(),
441                ) {
442                    let unprojected =
443                        unproject_window_exprs(filter.predicate.clone(), &window)?;
444                    let filter_expr = self.expr_to_sql(&unprojected)?;
445                    select.qualify(Some(filter_expr));
446                } else {
447                    let filter_expr = self.expr_to_sql(&filter.predicate)?;
448                    select.selection(Some(filter_expr));
449                }
450
451                self.select_to_sql_recursively(
452                    filter.input.as_ref(),
453                    query,
454                    select,
455                    relation,
456                )
457            }
458            LogicalPlan::Limit(limit) => {
459                // Limit can be top-level plan for derived table
460                if select.already_projected() {
461                    return self.derive_with_dialect_alias(
462                        "derived_limit",
463                        plan,
464                        relation,
465                        false,
466                        vec![],
467                    );
468                }
469                if let Some(fetch) = &limit.fetch {
470                    let Some(query) = query.as_mut() else {
471                        return internal_err!(
472                            "Limit operator only valid in a statement context."
473                        );
474                    };
475                    query.limit(Some(self.expr_to_sql(fetch)?));
476                }
477
478                if let Some(skip) = &limit.skip {
479                    let Some(query) = query.as_mut() else {
480                        return internal_err!(
481                            "Offset operator only valid in a statement context."
482                        );
483                    };
484
485                    query.offset(Some(ast::Offset {
486                        rows: ast::OffsetRows::None,
487                        value: self.expr_to_sql(skip)?,
488                    }));
489                }
490
491                self.select_to_sql_recursively(
492                    limit.input.as_ref(),
493                    query,
494                    select,
495                    relation,
496                )
497            }
498            LogicalPlan::Sort(sort) => {
499                // Sort can be top-level plan for derived table
500                if select.already_projected() {
501                    return self.derive_with_dialect_alias(
502                        "derived_sort",
503                        plan,
504                        relation,
505                        false,
506                        vec![],
507                    );
508                }
509                let Some(query_ref) = query else {
510                    return internal_err!(
511                        "Sort operator only valid in a statement context."
512                    );
513                };
514
515                if let Some(fetch) = sort.fetch {
516                    query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
517                        fetch.to_string(),
518                        false,
519                    ))));
520                };
521
522                let agg = find_agg_node_within_select(plan, select.already_projected());
523                // unproject sort expressions
524                let sort_exprs: Vec<SortExpr> = sort
525                    .expr
526                    .iter()
527                    .map(|sort_expr| {
528                        unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
529                    })
530                    .collect::<Result<Vec<_>>>()?;
531
532                query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
533
534                self.select_to_sql_recursively(
535                    sort.input.as_ref(),
536                    query,
537                    select,
538                    relation,
539                )
540            }
541            LogicalPlan::Aggregate(agg) => {
542                // Aggregation can be already handled in the projection case
543                if !select.already_projected() {
544                    // The query returns aggregate and group expressions. If that weren't the case,
545                    // the aggregate would have been placed inside a projection, making the check above^ false
546                    let exprs: Vec<_> = agg
547                        .aggr_expr
548                        .iter()
549                        .chain(agg.group_expr.iter())
550                        .map(|expr| self.select_item_to_sql(expr))
551                        .collect::<Result<Vec<_>>>()?;
552                    select.projection(exprs);
553
554                    select.group_by(ast::GroupByExpr::Expressions(
555                        agg.group_expr
556                            .iter()
557                            .map(|expr| self.expr_to_sql(expr))
558                            .collect::<Result<Vec<_>>>()?,
559                        vec![],
560                    ));
561                }
562
563                self.select_to_sql_recursively(
564                    agg.input.as_ref(),
565                    query,
566                    select,
567                    relation,
568                )
569            }
570            LogicalPlan::Distinct(distinct) => {
571                // Distinct can be top-level plan for derived table
572                if select.already_projected() {
573                    return self.derive_with_dialect_alias(
574                        "derived_distinct",
575                        plan,
576                        relation,
577                        false,
578                        vec![],
579                    );
580                }
581
582                // If this distinct is the parent of a Union and we're in a query context,
583                // then we need to unparse as a `UNION` rather than a `UNION ALL`.
584                if let Distinct::All(input) = distinct {
585                    if matches!(input.as_ref(), LogicalPlan::Union(_)) {
586                        if let Some(query_mut) = query.as_mut() {
587                            query_mut.distinct_union();
588                            return self.select_to_sql_recursively(
589                                input.as_ref(),
590                                query,
591                                select,
592                                relation,
593                            );
594                        }
595                    }
596                }
597
598                let (select_distinct, input) = match distinct {
599                    Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
600                    Distinct::On(on) => {
601                        let exprs = on
602                            .on_expr
603                            .iter()
604                            .map(|e| self.expr_to_sql(e))
605                            .collect::<Result<Vec<_>>>()?;
606                        let items = on
607                            .select_expr
608                            .iter()
609                            .map(|e| self.select_item_to_sql(e))
610                            .collect::<Result<Vec<_>>>()?;
611                        if let Some(sort_expr) = &on.sort_expr {
612                            if let Some(query_ref) = query {
613                                query_ref.order_by(self.sorts_to_sql(sort_expr)?);
614                            } else {
615                                return internal_err!(
616                                    "Sort operator only valid in a statement context."
617                                );
618                            }
619                        }
620                        select.projection(items);
621                        (ast::Distinct::On(exprs), on.input.as_ref())
622                    }
623                };
624                select.distinct(Some(select_distinct));
625                self.select_to_sql_recursively(input, query, select, relation)
626            }
627            LogicalPlan::Join(join) => {
628                let mut table_scan_filters = vec![];
629                let (left_plan, right_plan) = match join.join_type {
630                    JoinType::RightSemi | JoinType::RightAnti => {
631                        (&join.right, &join.left)
632                    }
633                    _ => (&join.left, &join.right),
634                };
635                // If there's an outer projection plan, it will already set up the projection.
636                // In that case, we don't need to worry about setting up the projection here.
637                // The outer projection plan will handle projecting the correct columns.
638                let already_projected = select.already_projected();
639
640                let left_plan =
641                    match try_transform_to_simple_table_scan_with_filters(left_plan)? {
642                        Some((plan, filters)) => {
643                            table_scan_filters.extend(filters);
644                            Arc::new(plan)
645                        }
646                        None => Arc::clone(left_plan),
647                    };
648
649                self.select_to_sql_recursively(
650                    left_plan.as_ref(),
651                    query,
652                    select,
653                    relation,
654                )?;
655
656                let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
657                {
658                    Some(select.pop_projections())
659                } else {
660                    None
661                };
662
663                let right_plan =
664                    match try_transform_to_simple_table_scan_with_filters(right_plan)? {
665                        Some((plan, filters)) => {
666                            table_scan_filters.extend(filters);
667                            Arc::new(plan)
668                        }
669                        None => Arc::clone(right_plan),
670                    };
671
672                let mut right_relation = RelationBuilder::default();
673
674                self.select_to_sql_recursively(
675                    right_plan.as_ref(),
676                    query,
677                    select,
678                    &mut right_relation,
679                )?;
680
681                let join_filters = if table_scan_filters.is_empty() {
682                    join.filter.clone()
683                } else {
684                    // Combine `table_scan_filters` into a single filter using `AND`
685                    let Some(combined_filters) =
686                        table_scan_filters.into_iter().reduce(|acc, filter| {
687                            Expr::BinaryExpr(BinaryExpr {
688                                left: Box::new(acc),
689                                op: Operator::And,
690                                right: Box::new(filter),
691                            })
692                        })
693                    else {
694                        return internal_err!("Failed to combine TableScan filters");
695                    };
696
697                    // Combine `join.filter` with `combined_filters` using `AND`
698                    match &join.filter {
699                        Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
700                            left: Box::new(filter.clone()),
701                            op: Operator::And,
702                            right: Box::new(combined_filters),
703                        })),
704                        None => Some(combined_filters),
705                    }
706                };
707
708                let join_constraint = self.join_constraint_to_sql(
709                    join.join_constraint,
710                    &join.on,
711                    join_filters.as_ref(),
712                )?;
713
714                let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
715                {
716                    Some(select.pop_projections())
717                } else {
718                    None
719                };
720
721                match join.join_type {
722                    JoinType::LeftSemi
723                    | JoinType::LeftAnti
724                    | JoinType::LeftMark
725                    | JoinType::RightSemi
726                    | JoinType::RightAnti
727                    | JoinType::RightMark => {
728                        let mut query_builder = QueryBuilder::default();
729                        let mut from = TableWithJoinsBuilder::default();
730                        let mut exists_select: SelectBuilder = SelectBuilder::default();
731                        from.relation(right_relation);
732                        exists_select.push_from(from);
733                        if let Some(filter) = &join.filter {
734                            exists_select.selection(Some(self.expr_to_sql(filter)?));
735                        }
736                        for (left, right) in &join.on {
737                            exists_select.selection(Some(
738                                self.expr_to_sql(&left.clone().eq(right.clone()))?,
739                            ));
740                        }
741                        exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
742                            ast::Expr::value(ast::Value::Number("1".to_string(), false)),
743                        )]);
744                        query_builder.body(Box::new(SetExpr::Select(Box::new(
745                            exists_select.build()?,
746                        ))));
747
748                        let negated = match join.join_type {
749                            JoinType::LeftSemi
750                            | JoinType::RightSemi
751                            | JoinType::LeftMark
752                            | JoinType::RightMark => false,
753                            JoinType::LeftAnti | JoinType::RightAnti => true,
754                            _ => unreachable!(),
755                        };
756                        let exists_expr = ast::Expr::Exists {
757                            subquery: Box::new(query_builder.build()?),
758                            negated,
759                        };
760
761                        match join.join_type {
762                            JoinType::LeftMark | JoinType::RightMark => {
763                                let source_schema =
764                                    if join.join_type == JoinType::LeftMark {
765                                        right_plan.schema()
766                                    } else {
767                                        left_plan.schema()
768                                    };
769                                let (table_ref, _) = source_schema.qualified_field(0);
770                                let column = self.col_to_sql(&Column::new(
771                                    table_ref.cloned(),
772                                    "mark",
773                                ))?;
774                                select.replace_mark(&column, &exists_expr);
775                            }
776                            _ => {
777                                select.selection(Some(exists_expr));
778                            }
779                        }
780                        if let Some(projection) = left_projection {
781                            select.projection(projection);
782                        }
783                    }
784                    JoinType::Inner
785                    | JoinType::Left
786                    | JoinType::Right
787                    | JoinType::Full => {
788                        let Ok(Some(relation)) = right_relation.build() else {
789                            return internal_err!("Failed to build right relation");
790                        };
791                        let ast_join = ast::Join {
792                            relation,
793                            global: false,
794                            join_operator: self
795                                .join_operator_to_sql(join.join_type, join_constraint)?,
796                        };
797                        let mut from = select.pop_from().unwrap();
798                        from.push_join(ast_join);
799                        select.push_from(from);
800                        if !already_projected {
801                            let Some(left_projection) = left_projection else {
802                                return internal_err!("Left projection is missing");
803                            };
804
805                            let Some(right_projection) = right_projection else {
806                                return internal_err!("Right projection is missing");
807                            };
808
809                            let projection = left_projection
810                                .into_iter()
811                                .chain(right_projection)
812                                .collect();
813                            select.projection(projection);
814                        }
815                    }
816                };
817
818                Ok(())
819            }
820            LogicalPlan::SubqueryAlias(plan_alias) => {
821                let (plan, mut columns) =
822                    subquery_alias_inner_query_and_columns(plan_alias);
823                let unparsed_table_scan = Self::unparse_table_scan_pushdown(
824                    plan,
825                    Some(plan_alias.alias.clone()),
826                    select.already_projected(),
827                )?;
828                // if the child plan is a TableScan with pushdown operations, we don't need to
829                // create an additional subquery for it
830                if !select.already_projected() && unparsed_table_scan.is_none() {
831                    select.projection(vec![ast::SelectItem::Wildcard(
832                        ast::WildcardAdditionalOptions::default(),
833                    )]);
834                }
835                let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
836                if !columns.is_empty()
837                    && !self.dialect.supports_column_alias_in_table_alias()
838                {
839                    // Instead of specifying column aliases as part of the outer table, inject them directly into the inner projection
840                    let rewritten_plan =
841                        match inject_column_aliases_into_subquery(plan, columns) {
842                            Ok(p) => p,
843                            Err(e) => {
844                                return internal_err!(
845                                    "Failed to transform SubqueryAlias plan: {e}"
846                                )
847                            }
848                        };
849
850                    columns = vec![];
851
852                    self.select_to_sql_recursively(
853                        &rewritten_plan,
854                        query,
855                        select,
856                        relation,
857                    )?;
858                } else {
859                    self.select_to_sql_recursively(&plan, query, select, relation)?;
860                }
861
862                relation.alias(Some(
863                    self.new_table_alias(plan_alias.alias.table().to_string(), columns),
864                ));
865
866                Ok(())
867            }
868            LogicalPlan::Union(union) => {
869                // Covers cases where the UNION is a subquery and the projection is at the top level
870                if select.already_projected() {
871                    return self.derive_with_dialect_alias(
872                        "derived_union",
873                        plan,
874                        relation,
875                        false,
876                        vec![],
877                    );
878                }
879
880                let input_exprs: Vec<SetExpr> = union
881                    .inputs
882                    .iter()
883                    .map(|input| self.select_to_sql_expr(input, query))
884                    .collect::<Result<Vec<_>>>()?;
885
886                if input_exprs.len() < 2 {
887                    return internal_err!("UNION operator requires at least 2 inputs");
888                }
889
890                let set_quantifier =
891                    if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
892                        // Setting the SetQuantifier to None will unparse as a `UNION`
893                        // rather than a `UNION ALL`.
894                        ast::SetQuantifier::None
895                    } else {
896                        ast::SetQuantifier::All
897                    };
898
899                // Build the union expression tree bottom-up by reversing the order
900                // note that we are also swapping left and right inputs because of the rev
901                let union_expr = input_exprs
902                    .into_iter()
903                    .rev()
904                    .reduce(|a, b| SetExpr::SetOperation {
905                        op: ast::SetOperator::Union,
906                        set_quantifier,
907                        left: Box::new(b),
908                        right: Box::new(a),
909                    })
910                    .unwrap();
911
912                let Some(query) = query.as_mut() else {
913                    return internal_err!(
914                        "UNION ALL operator only valid in a statement context"
915                    );
916                };
917                query.body(Box::new(union_expr));
918
919                Ok(())
920            }
921            LogicalPlan::Window(window) => {
922                // Window nodes are handled simultaneously with Projection nodes
923                self.select_to_sql_recursively(
924                    window.input.as_ref(),
925                    query,
926                    select,
927                    relation,
928                )
929            }
930            LogicalPlan::EmptyRelation(_) => {
931                // An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,
932                // a TableRelationBuilder will be created for the UNNEST node first.
933                if !relation.has_relation() {
934                    relation.empty();
935                }
936                Ok(())
937            }
938            LogicalPlan::Extension(extension) => {
939                if let Some(query) = query.as_mut() {
940                    self.extension_to_sql(
941                        extension.node.as_ref(),
942                        &mut Some(query),
943                        &mut Some(select),
944                        &mut Some(relation),
945                    )
946                } else {
947                    self.extension_to_sql(
948                        extension.node.as_ref(),
949                        &mut None,
950                        &mut Some(select),
951                        &mut Some(relation),
952                    )
953                }
954            }
955            LogicalPlan::Unnest(unnest) => {
956                if !unnest.struct_type_columns.is_empty() {
957                    return internal_err!(
958                        "Struct type columns are not currently supported in UNNEST: {:?}",
959                        unnest.struct_type_columns
960                    );
961                }
962
963                // In the case of UNNEST, the Unnest node is followed by a duplicate Projection node that we should skip.
964                // Otherwise, there will be a duplicate SELECT clause.
965                // | Projection: table.col1, UNNEST(table.col2)
966                // |   Unnest: UNNEST(table.col2)
967                // |     Projection: table.col1, table.col2 AS UNNEST(table.col2)
968                // |       Filter: table.col3 = Int64(3)
969                // |         TableScan: table projection=None
970                if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
971                    // continue with projection input
972                    self.select_to_sql_recursively(&p.input, query, select, relation)
973                } else {
974                    internal_err!("Unnest input is not a Projection: {unnest:?}")
975                }
976            }
977            LogicalPlan::Subquery(subquery)
978                if find_unnest_node_until_relation(subquery.subquery.as_ref())
979                    .is_some() =>
980            {
981                if self.dialect.unnest_as_table_factor() {
982                    self.select_to_sql_recursively(
983                        subquery.subquery.as_ref(),
984                        query,
985                        select,
986                        relation,
987                    )
988                } else {
989                    self.derive_with_dialect_alias(
990                        "derived_unnest",
991                        subquery.subquery.as_ref(),
992                        relation,
993                        true,
994                        vec![],
995                    )
996                }
997            }
998            _ => {
999                not_impl_err!("Unsupported operator: {plan:?}")
1000            }
1001        }
1002    }
1003
1004    /// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`.
1005    ///
1006    /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`,
1007    ///   it means it is a scalar column, return [UnnestInputType::Scalar].
1008    /// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`,
1009    ///   it means it is an outer reference column, return [UnnestInputType::OuterReference].
1010    /// - If the column is not a placeholder column, return [None].
1011    ///
1012    /// `outer_ref` is the display result of [Expr::OuterReferenceColumn]
1013    fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
1014        if let Expr::Alias(Alias { expr, .. }) = expr {
1015            if let Expr::Column(Column { name, .. }) = expr.as_ref() {
1016                if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
1017                    if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1018                        return Some(UnnestInputType::OuterReference);
1019                    }
1020                    return Some(UnnestInputType::Scalar);
1021                }
1022            }
1023        }
1024        None
1025    }
1026
1027    fn try_unnest_to_table_factor_sql(
1028        &self,
1029        unnest: &Unnest,
1030    ) -> Result<Option<UnnestRelationBuilder>> {
1031        let mut unnest_relation = UnnestRelationBuilder::default();
1032        let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
1033            return Ok(None);
1034        };
1035
1036        if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
1037            // It may be possible that UNNEST is used as a source for the query.
1038            // However, at this point, we don't yet know if it is just a single expression
1039            // from another source or if it's from UNNEST.
1040            //
1041            // Unnest(Projection(EmptyRelation)) denotes a case with `UNNEST([...])`,
1042            // which is normally safe to unnest as a table factor.
1043            // However, in the future, more comprehensive checks can be added here.
1044            return Ok(None);
1045        };
1046
1047        let exprs = projection
1048            .expr
1049            .iter()
1050            .map(|e| self.expr_to_sql(e))
1051            .collect::<Result<Vec<_>>>()?;
1052        unnest_relation.array_exprs(exprs);
1053
1054        Ok(Some(unnest_relation))
1055    }
1056
1057    fn is_scan_with_pushdown(scan: &TableScan) -> bool {
1058        scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
1059    }
1060
1061    /// Try to unparse a table scan with pushdown operations into a new subquery plan.
1062    /// If the table scan is without any pushdown operations, return None.
1063    fn unparse_table_scan_pushdown(
1064        plan: &LogicalPlan,
1065        alias: Option<TableReference>,
1066        already_projected: bool,
1067    ) -> Result<Option<LogicalPlan>> {
1068        match plan {
1069            LogicalPlan::TableScan(table_scan) => {
1070                if !Self::is_scan_with_pushdown(table_scan) {
1071                    return Ok(None);
1072                }
1073                let table_schema = table_scan.source.schema();
1074                let mut filter_alias_rewriter =
1075                    alias.as_ref().map(|alias_name| TableAliasRewriter {
1076                        table_schema: &table_schema,
1077                        alias_name: alias_name.clone(),
1078                    });
1079
1080                let mut builder = LogicalPlanBuilder::scan(
1081                    table_scan.table_name.clone(),
1082                    Arc::clone(&table_scan.source),
1083                    None,
1084                )?;
1085                // We will rebase the column references to the new alias if it exists.
1086                // If the projection or filters are empty, we will append alias to the table scan.
1087                //
1088                // Example:
1089                //   select t1.c1 from t1 where t1.c1 > 1 -> select a.c1 from t1 as a where a.c1 > 1
1090                if let Some(ref alias) = alias {
1091                    if table_scan.projection.is_some() || !table_scan.filters.is_empty() {
1092                        builder = builder.alias(alias.clone())?;
1093                    }
1094                }
1095
1096                // Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
1097                // For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
1098                // information included in the TableScan node.
1099                if !already_projected {
1100                    if let Some(project_vec) = &table_scan.projection {
1101                        if project_vec.is_empty() {
1102                            builder = builder.project(vec![Expr::Literal(
1103                                ScalarValue::Int64(Some(1)),
1104                                None,
1105                            )])?;
1106                        } else {
1107                            let project_columns = project_vec
1108                                .iter()
1109                                .cloned()
1110                                .map(|i| {
1111                                    let schema = table_scan.source.schema();
1112                                    let field = schema.field(i);
1113                                    if alias.is_some() {
1114                                        Column::new(alias.clone(), field.name().clone())
1115                                    } else {
1116                                        Column::new(
1117                                            Some(table_scan.table_name.clone()),
1118                                            field.name().clone(),
1119                                        )
1120                                    }
1121                                })
1122                                .collect::<Vec<_>>();
1123                            builder = builder.project(project_columns)?;
1124                        };
1125                    }
1126                }
1127
1128                let filter_expr: Result<Option<Expr>> = table_scan
1129                    .filters
1130                    .iter()
1131                    .cloned()
1132                    .map(|expr| {
1133                        if let Some(ref mut rewriter) = filter_alias_rewriter {
1134                            expr.rewrite_with_lambdas_params(rewriter).data()
1135                        } else {
1136                            Ok(expr)
1137                        }
1138                    })
1139                    .reduce(|acc, expr_result| {
1140                        acc.and_then(|acc_expr| {
1141                            expr_result.map(|expr| acc_expr.and(expr))
1142                        })
1143                    })
1144                    .transpose();
1145
1146                if let Some(filter) = filter_expr? {
1147                    builder = builder.filter(filter)?;
1148                }
1149
1150                if let Some(fetch) = table_scan.fetch {
1151                    builder = builder.limit(0, Some(fetch))?;
1152                }
1153
1154                // If the table scan has an alias but no projection or filters, it means no column references are rebased.
1155                // So we will append the alias to this subquery.
1156                // Example:
1157                //   select * from t1 limit 10 -> (select * from t1 limit 10) as a
1158                if let Some(alias) = alias {
1159                    if table_scan.projection.is_none() && table_scan.filters.is_empty() {
1160                        builder = builder.alias(alias)?;
1161                    }
1162                }
1163
1164                Ok(Some(builder.build()?))
1165            }
1166            LogicalPlan::SubqueryAlias(subquery_alias) => {
1167                let ret = Self::unparse_table_scan_pushdown(
1168                    &subquery_alias.input,
1169                    Some(subquery_alias.alias.clone()),
1170                    already_projected,
1171                )?;
1172                if let Some(alias) = alias {
1173                    if let Some(plan) = ret {
1174                        let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1175                        return Ok(Some(plan));
1176                    }
1177                }
1178                Ok(ret)
1179            }
1180            // SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
1181            // The inner table scan could be a scan with pushdown operations.
1182            LogicalPlan::Projection(projection) => {
1183                if let Some(plan) = Self::unparse_table_scan_pushdown(
1184                    &projection.input,
1185                    alias.clone(),
1186                    already_projected,
1187                )? {
1188                    let exprs = if alias.is_some() {
1189                        let mut alias_rewriter =
1190                            alias.as_ref().map(|alias_name| TableAliasRewriter {
1191                                table_schema: plan.schema().as_arrow(),
1192                                alias_name: alias_name.clone(),
1193                            });
1194                        projection
1195                            .expr
1196                            .iter()
1197                            .cloned()
1198                            .map(|expr| {
1199                                if let Some(ref mut rewriter) = alias_rewriter {
1200                                    expr.rewrite_with_lambdas_params(rewriter).data()
1201                                } else {
1202                                    Ok(expr)
1203                                }
1204                            })
1205                            .collect::<Result<Vec<_>>>()?
1206                    } else {
1207                        projection.expr.clone()
1208                    };
1209                    Ok(Some(
1210                        LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1211                    ))
1212                } else {
1213                    Ok(None)
1214                }
1215            }
1216            _ => Ok(None),
1217        }
1218    }
1219
1220    fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1221        match expr {
1222            Expr::Alias(Alias { expr, name, .. }) => {
1223                let inner = self.expr_to_sql(expr)?;
1224
1225                // Determine the alias name to use
1226                let col_name = if let Some(rewritten_name) =
1227                    self.dialect.col_alias_overrides(name)?
1228                {
1229                    rewritten_name.to_string()
1230                } else {
1231                    name.to_string()
1232                };
1233
1234                Ok(ast::SelectItem::ExprWithAlias {
1235                    expr: inner,
1236                    alias: self.new_ident_quoted_if_needs(col_name),
1237                })
1238            }
1239            _ => {
1240                let inner = self.expr_to_sql(expr)?;
1241
1242                Ok(ast::SelectItem::UnnamedExpr(inner))
1243            }
1244        }
1245    }
1246
1247    fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1248        Ok(OrderByKind::Expressions(
1249            sort_exprs
1250                .iter()
1251                .map(|sort_expr| self.sort_to_sql(sort_expr))
1252                .collect::<Result<Vec<_>>>()?,
1253        ))
1254    }
1255
1256    fn join_operator_to_sql(
1257        &self,
1258        join_type: JoinType,
1259        constraint: ast::JoinConstraint,
1260    ) -> Result<ast::JoinOperator> {
1261        Ok(match join_type {
1262            JoinType::Inner => match &constraint {
1263                ast::JoinConstraint::On(_)
1264                | ast::JoinConstraint::Using(_)
1265                | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1266                ast::JoinConstraint::None => {
1267                    // Inner joins with no conditions or filters are not valid SQL in most systems,
1268                    // return a CROSS JOIN instead
1269                    ast::JoinOperator::CrossJoin(constraint)
1270                }
1271            },
1272            JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1273            JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1274            JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1275            JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1276            JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1277            JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1278            JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1279            JoinType::LeftMark | JoinType::RightMark => {
1280                unimplemented!("Unparsing of Mark join type")
1281            }
1282        })
1283    }
1284
1285    /// Convert the components of a USING clause to the USING AST. Returns
1286    /// 'None' if the conditions are not compatible with a USING expression,
1287    /// e.g. non-column expressions or non-matching names.
1288    fn join_using_to_sql(
1289        &self,
1290        join_conditions: &[(Expr, Expr)],
1291    ) -> Option<ast::JoinConstraint> {
1292        let mut object_names = Vec::with_capacity(join_conditions.len());
1293        for (left, right) in join_conditions {
1294            match (left, right) {
1295                (
1296                    Expr::Column(Column {
1297                        relation: _,
1298                        name: left_name,
1299                        spans: _,
1300                    }),
1301                    Expr::Column(Column {
1302                        relation: _,
1303                        name: right_name,
1304                        spans: _,
1305                    }),
1306                ) if left_name == right_name => {
1307                    // For example, if the join condition `t1.id = t2.id`
1308                    // this is represented as two columns like `[t1.id, t2.id]`
1309                    // This code forms `id` (without relation name)
1310                    let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1311                    object_names.push(ast::ObjectName::from(vec![ident]));
1312                }
1313                // USING is only valid with matching column names; arbitrary expressions
1314                // are not allowed
1315                _ => return None,
1316            }
1317        }
1318        Some(ast::JoinConstraint::Using(object_names))
1319    }
1320
1321    /// Convert a join constraint and associated conditions and filter to a SQL AST node
1322    fn join_constraint_to_sql(
1323        &self,
1324        constraint: JoinConstraint,
1325        conditions: &[(Expr, Expr)],
1326        filter: Option<&Expr>,
1327    ) -> Result<ast::JoinConstraint> {
1328        match (constraint, conditions, filter) {
1329            // No constraints
1330            (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1331                Ok(ast::JoinConstraint::None)
1332            }
1333
1334            (JoinConstraint::Using, conditions, None) => {
1335                match self.join_using_to_sql(conditions) {
1336                    Some(using) => Ok(using),
1337                    // As above, this should not be reachable from parsed SQL,
1338                    // but a user could create this; we "downgrade" to ON.
1339                    None => self.join_conditions_to_sql_on(conditions, None),
1340                }
1341            }
1342
1343            // Two cases here:
1344            // 1. Straightforward ON case, with possible equi-join conditions
1345            //    and additional filters
1346            // 2. USING with additional filters; we "downgrade" to ON, because
1347            //    you can't use USING with arbitrary filters. (This should not
1348            //    be accessible from parsed SQL, but may have been a
1349            //    custom-built JOIN by a user.)
1350            (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1351                self.join_conditions_to_sql_on(conditions, filter)
1352            }
1353        }
1354    }
1355
1356    // Convert a list of equi0join conditions and an optional filter to a SQL ON
1357    // AST node, with the equi-join conditions and the filter merged into a
1358    // single conditional expression
1359    fn join_conditions_to_sql_on(
1360        &self,
1361        join_conditions: &[(Expr, Expr)],
1362        filter: Option<&Expr>,
1363    ) -> Result<ast::JoinConstraint> {
1364        let mut condition = None;
1365        // AND the join conditions together to create the overall condition
1366        for (left, right) in join_conditions {
1367            // Parse left and right
1368            let l = self.expr_to_sql(left)?;
1369            let r = self.expr_to_sql(right)?;
1370            let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1371            condition = match condition {
1372                Some(expr) => Some(self.and_op_to_sql(expr, e)),
1373                None => Some(e),
1374            };
1375        }
1376
1377        // Then AND the non-equijoin filter condition as well
1378        condition = match (condition, filter) {
1379            (Some(expr), Some(filter)) => {
1380                Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1381            }
1382            (Some(expr), None) => Some(expr),
1383            (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1384            (None, None) => None,
1385        };
1386
1387        let constraint = match condition {
1388            Some(filter) => ast::JoinConstraint::On(filter),
1389            None => ast::JoinConstraint::None,
1390        };
1391
1392        Ok(constraint)
1393    }
1394
1395    fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1396        self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1397    }
1398
1399    fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1400        let columns = columns
1401            .into_iter()
1402            .map(|ident| TableAliasColumnDef {
1403                name: ident,
1404                data_type: None,
1405            })
1406            .collect();
1407        ast::TableAlias {
1408            name: self.new_ident_quoted_if_needs(alias),
1409            columns,
1410        }
1411    }
1412
1413    fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1414        not_impl_err!("Unsupported plan: {plan:?}")
1415    }
1416}
1417
1418impl From<BuilderError> for DataFusionError {
1419    fn from(e: BuilderError) -> Self {
1420        DataFusionError::External(Box::new(e))
1421    }
1422}
1423
1424/// The type of the input to the UNNEST table factor.
1425#[derive(Debug)]
1426enum UnnestInputType {
1427    /// The input is a column reference. It will be presented like `outer_ref(column_name)`.
1428    OuterReference,
1429    /// The input is a scalar value. It will be presented like a scalar array or struct.
1430    Scalar,
1431}