datafusion_sql/
statement.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24    CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25    LexOrdering, Statement as DFStatement,
26};
27use crate::planner::{
28    object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{Field, FieldRef, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36    exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
37    unqualified_field_not_found, Column, Constraint, Constraints, DFSchema, DFSchemaRef,
38    DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, TableReference,
39    ToDFSchema,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::builder::project;
44use datafusion_expr::logical_plan::DdlStatement;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47    cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
48    CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49    CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50    DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51    EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
52    LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable,
53    SortExpr, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
54    TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
55    Volatility, WriteOp,
56};
57use sqlparser::ast::{
58    self, BeginTransactionKind, IndexColumn, IndexType, NullsDistinctOption, OrderByExpr,
59    OrderByOptions, Set, ShowStatementIn, ShowStatementOptions, SqliteOnConflict,
60    TableObject, UpdateTableFromKind, ValueWithSpan,
61};
62use sqlparser::ast::{
63    Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
64    CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
65    ObjectName, ObjectType, Query, SchemaName, SetExpr, ShowCreateObject,
66    ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins,
67    TransactionMode, UnaryOperator, Value,
68};
69use sqlparser::parser::ParserError::ParserError;
70
71fn ident_to_string(ident: &Ident) -> String {
72    normalize_ident(ident.to_owned())
73}
74
75fn object_name_to_string(object_name: &ObjectName) -> String {
76    object_name
77        .0
78        .iter()
79        .map(|object_name_part| {
80            object_name_part
81                .as_ident()
82                // TODO: It might be better to return an error
83                // than to silently use a default value.
84                .map_or_else(String::new, ident_to_string)
85        })
86        .collect::<Vec<String>>()
87        .join(".")
88}
89
90fn get_schema_name(schema_name: &SchemaName) -> String {
91    match schema_name {
92        SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
93        SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
94        SchemaName::NamedAuthorization(schema_name, auth) => format!(
95            "{}.{}",
96            object_name_to_string(schema_name),
97            ident_to_string(auth)
98        ),
99    }
100}
101
102/// Construct `TableConstraint`(s) for the given columns by iterating over
103/// `columns` and extracting individual inline constraint definitions.
104fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
105    let mut constraints = vec![];
106    for column in columns {
107        for ast::ColumnOptionDef { name, option } in &column.options {
108            match option {
109                ast::ColumnOption::Unique {
110                    is_primary: false,
111                    characteristics,
112                } => constraints.push(TableConstraint::Unique {
113                    name: name.clone(),
114                    columns: vec![IndexColumn {
115                        column: OrderByExpr {
116                            expr: SQLExpr::Identifier(column.name.clone()),
117                            options: OrderByOptions {
118                                asc: None,
119                                nulls_first: None,
120                            },
121                            with_fill: None,
122                        },
123                        operator_class: None,
124                    }],
125                    characteristics: *characteristics,
126                    index_name: None,
127                    index_type_display: ast::KeyOrIndexDisplay::None,
128                    index_type: None,
129                    index_options: vec![],
130                    nulls_distinct: NullsDistinctOption::None,
131                }),
132                ast::ColumnOption::Unique {
133                    is_primary: true,
134                    characteristics,
135                } => constraints.push(TableConstraint::PrimaryKey {
136                    name: name.clone(),
137                    columns: vec![IndexColumn {
138                        column: OrderByExpr {
139                            expr: SQLExpr::Identifier(column.name.clone()),
140                            options: OrderByOptions {
141                                asc: None,
142                                nulls_first: None,
143                            },
144                            with_fill: None,
145                        },
146                        operator_class: None,
147                    }],
148                    characteristics: *characteristics,
149                    index_name: None,
150                    index_type: None,
151                    index_options: vec![],
152                }),
153                ast::ColumnOption::ForeignKey {
154                    foreign_table,
155                    referred_columns,
156                    on_delete,
157                    on_update,
158                    characteristics,
159                } => constraints.push(TableConstraint::ForeignKey {
160                    name: name.clone(),
161                    columns: vec![],
162                    foreign_table: foreign_table.clone(),
163                    referred_columns: referred_columns.to_vec(),
164                    on_delete: *on_delete,
165                    on_update: *on_update,
166                    characteristics: *characteristics,
167                    index_name: None,
168                }),
169                ast::ColumnOption::Check(expr) => {
170                    constraints.push(TableConstraint::Check {
171                        name: name.clone(),
172                        expr: Box::new(expr.clone()),
173                        enforced: None,
174                    })
175                }
176                // Other options are not constraint related.
177                ast::ColumnOption::Default(_)
178                | ast::ColumnOption::Null
179                | ast::ColumnOption::NotNull
180                | ast::ColumnOption::DialectSpecific(_)
181                | ast::ColumnOption::CharacterSet(_)
182                | ast::ColumnOption::Generated { .. }
183                | ast::ColumnOption::Comment(_)
184                | ast::ColumnOption::Options(_)
185                | ast::ColumnOption::OnUpdate(_)
186                | ast::ColumnOption::Materialized(_)
187                | ast::ColumnOption::Ephemeral(_)
188                | ast::ColumnOption::Identity(_)
189                | ast::ColumnOption::OnConflict(_)
190                | ast::ColumnOption::Policy(_)
191                | ast::ColumnOption::Tags(_)
192                | ast::ColumnOption::Alias(_)
193                | ast::ColumnOption::Srid(_)
194                | ast::ColumnOption::Collation(_) => {}
195            }
196        }
197    }
198    constraints
199}
200
201impl<S: ContextProvider> SqlToRel<'_, S> {
202    /// Generate a logical plan from an DataFusion SQL statement
203    pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
204        match statement {
205            DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
206            DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
207            DFStatement::CopyTo(s) => self.copy_to_plan(s),
208            DFStatement::Explain(ExplainStatement {
209                verbose,
210                analyze,
211                format,
212                statement,
213            }) => self.explain_to_plan(verbose, analyze, format, *statement),
214        }
215    }
216
217    /// Generate a logical plan from an SQL statement
218    pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
219        self.sql_statement_to_plan_with_context_impl(
220            statement,
221            &mut PlannerContext::new(),
222        )
223    }
224
225    /// Generate a logical plan from an SQL statement
226    pub fn sql_statement_to_plan_with_context(
227        &self,
228        statement: Statement,
229        planner_context: &mut PlannerContext,
230    ) -> Result<LogicalPlan> {
231        self.sql_statement_to_plan_with_context_impl(statement, planner_context)
232    }
233
234    fn sql_statement_to_plan_with_context_impl(
235        &self,
236        statement: Statement,
237        planner_context: &mut PlannerContext,
238    ) -> Result<LogicalPlan> {
239        match statement {
240            Statement::ExplainTable {
241                describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, // only parse 'DESCRIBE table_name' or 'DESC table_name' and not 'EXPLAIN table_name'
242                table_name,
243                ..
244            } => self.describe_table_to_plan(table_name),
245            Statement::Explain {
246                describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, // only parse 'DESCRIBE statement' or 'DESC statement' and not 'EXPLAIN statement'
247                statement,
248                ..
249            } => match *statement {
250                Statement::Query(query) => self.describe_query_to_plan(*query),
251                _ => {
252                    not_impl_err!("Describing statements other than SELECT not supported")
253                }
254            },
255            Statement::Explain {
256                verbose,
257                statement,
258                analyze,
259                format,
260                describe_alias: _,
261                ..
262            } => {
263                let format = format.map(|format| format.to_string());
264                let statement = DFStatement::Statement(statement);
265                self.explain_to_plan(verbose, analyze, format, statement)
266            }
267            Statement::Query(query) => self.query_to_plan(*query, planner_context),
268            Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
269            Statement::Set(statement) => self.set_statement_to_plan(statement),
270            Statement::CreateTable(CreateTable {
271                temporary,
272                external,
273                global,
274                transient,
275                volatile,
276                hive_distribution,
277                hive_formats,
278                file_format,
279                location,
280                query,
281                name,
282                columns,
283                constraints,
284                if_not_exists,
285                or_replace,
286                without_rowid,
287                like,
288                clone,
289                comment,
290                on_commit,
291                on_cluster,
292                primary_key,
293                order_by,
294                partition_by,
295                cluster_by,
296                clustered_by,
297                strict,
298                copy_grants,
299                enable_schema_evolution,
300                change_tracking,
301                data_retention_time_in_days,
302                max_data_extension_time_in_days,
303                default_ddl_collation,
304                with_aggregation_policy,
305                with_row_access_policy,
306                with_tags,
307                iceberg,
308                external_volume,
309                base_location,
310                catalog,
311                catalog_sync,
312                storage_serialization_policy,
313                inherits,
314                table_options: CreateTableOptions::None,
315                dynamic,
316                version,
317                target_lag,
318                warehouse,
319                refresh_mode,
320                initialize,
321                require_user,
322            }) => {
323                if temporary {
324                    return not_impl_err!("Temporary tables not supported")?;
325                }
326                if external {
327                    return not_impl_err!("External tables not supported")?;
328                }
329                if global.is_some() {
330                    return not_impl_err!("Global tables not supported")?;
331                }
332                if transient {
333                    return not_impl_err!("Transient tables not supported")?;
334                }
335                if volatile {
336                    return not_impl_err!("Volatile tables not supported")?;
337                }
338                if hive_distribution != ast::HiveDistributionStyle::NONE {
339                    return not_impl_err!(
340                        "Hive distribution not supported: {hive_distribution:?}"
341                    )?;
342                }
343                if !matches!(
344                    hive_formats,
345                    Some(ast::HiveFormat {
346                        row_format: None,
347                        serde_properties: None,
348                        storage: None,
349                        location: None,
350                    })
351                ) {
352                    return not_impl_err!(
353                        "Hive formats not supported: {hive_formats:?}"
354                    )?;
355                }
356                if file_format.is_some() {
357                    return not_impl_err!("File format not supported")?;
358                }
359                if location.is_some() {
360                    return not_impl_err!("Location not supported")?;
361                }
362                if without_rowid {
363                    return not_impl_err!("Without rowid not supported")?;
364                }
365                if like.is_some() {
366                    return not_impl_err!("Like not supported")?;
367                }
368                if clone.is_some() {
369                    return not_impl_err!("Clone not supported")?;
370                }
371                if comment.is_some() {
372                    return not_impl_err!("Comment not supported")?;
373                }
374                if on_commit.is_some() {
375                    return not_impl_err!("On commit not supported")?;
376                }
377                if on_cluster.is_some() {
378                    return not_impl_err!("On cluster not supported")?;
379                }
380                if primary_key.is_some() {
381                    return not_impl_err!("Primary key not supported")?;
382                }
383                if order_by.is_some() {
384                    return not_impl_err!("Order by not supported")?;
385                }
386                if partition_by.is_some() {
387                    return not_impl_err!("Partition by not supported")?;
388                }
389                if cluster_by.is_some() {
390                    return not_impl_err!("Cluster by not supported")?;
391                }
392                if clustered_by.is_some() {
393                    return not_impl_err!("Clustered by not supported")?;
394                }
395                if strict {
396                    return not_impl_err!("Strict not supported")?;
397                }
398                if copy_grants {
399                    return not_impl_err!("Copy grants not supported")?;
400                }
401                if enable_schema_evolution.is_some() {
402                    return not_impl_err!("Enable schema evolution not supported")?;
403                }
404                if change_tracking.is_some() {
405                    return not_impl_err!("Change tracking not supported")?;
406                }
407                if data_retention_time_in_days.is_some() {
408                    return not_impl_err!("Data retention time in days not supported")?;
409                }
410                if max_data_extension_time_in_days.is_some() {
411                    return not_impl_err!(
412                        "Max data extension time in days not supported"
413                    )?;
414                }
415                if default_ddl_collation.is_some() {
416                    return not_impl_err!("Default DDL collation not supported")?;
417                }
418                if with_aggregation_policy.is_some() {
419                    return not_impl_err!("With aggregation policy not supported")?;
420                }
421                if with_row_access_policy.is_some() {
422                    return not_impl_err!("With row access policy not supported")?;
423                }
424                if with_tags.is_some() {
425                    return not_impl_err!("With tags not supported")?;
426                }
427                if iceberg {
428                    return not_impl_err!("Iceberg not supported")?;
429                }
430                if external_volume.is_some() {
431                    return not_impl_err!("External volume not supported")?;
432                }
433                if base_location.is_some() {
434                    return not_impl_err!("Base location not supported")?;
435                }
436                if catalog.is_some() {
437                    return not_impl_err!("Catalog not supported")?;
438                }
439                if catalog_sync.is_some() {
440                    return not_impl_err!("Catalog sync not supported")?;
441                }
442                if storage_serialization_policy.is_some() {
443                    return not_impl_err!("Storage serialization policy not supported")?;
444                }
445                if inherits.is_some() {
446                    return not_impl_err!("Table inheritance not supported")?;
447                }
448                if dynamic {
449                    return not_impl_err!("Dynamic tables not supported")?;
450                }
451                if version.is_some() {
452                    return not_impl_err!("Version not supported")?;
453                }
454                if target_lag.is_some() {
455                    return not_impl_err!("Target lag not supported")?;
456                }
457                if warehouse.is_some() {
458                    return not_impl_err!("Warehouse not supported")?;
459                }
460                if refresh_mode.is_some() {
461                    return not_impl_err!("Refresh mode not supported")?;
462                }
463                if initialize.is_some() {
464                    return not_impl_err!("Initialize not supported")?;
465                }
466                if require_user {
467                    return not_impl_err!("Require user not supported")?;
468                }
469                // Merge inline constraints and existing constraints
470                let mut all_constraints = constraints;
471                let inline_constraints = calc_inline_constraints_from_columns(&columns);
472                all_constraints.extend(inline_constraints);
473                // Build column default values
474                let column_defaults =
475                    self.build_column_defaults(&columns, planner_context)?;
476
477                let has_columns = !columns.is_empty();
478                let schema = self.build_schema(columns)?.to_dfschema_ref()?;
479                if has_columns {
480                    planner_context.set_table_schema(Some(Arc::clone(&schema)));
481                }
482
483                match query {
484                    Some(query) => {
485                        let plan = self.query_to_plan(*query, planner_context)?;
486                        let input_schema = plan.schema();
487
488                        let plan = if has_columns {
489                            if schema.fields().len() != input_schema.fields().len() {
490                                return plan_err!(
491                                    "Mismatch: {} columns specified, but result has {} columns",
492                                    schema.fields().len(),
493                                    input_schema.fields().len()
494                                );
495                            }
496                            let input_fields = input_schema.fields();
497                            let project_exprs = schema
498                                .fields()
499                                .iter()
500                                .zip(input_fields)
501                                .map(|(field, input_field)| {
502                                    cast(
503                                        col(input_field.name()),
504                                        field.data_type().clone(),
505                                    )
506                                    .alias(field.name())
507                                })
508                                .collect::<Vec<_>>();
509
510                            LogicalPlanBuilder::from(plan.clone())
511                                .project(project_exprs)?
512                                .build()?
513                        } else {
514                            plan
515                        };
516
517                        let constraints = self.new_constraint_from_table_constraints(
518                            &all_constraints,
519                            plan.schema(),
520                        )?;
521
522                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
523                            CreateMemoryTable {
524                                name: self.object_name_to_table_reference(name)?,
525                                constraints,
526                                input: Arc::new(plan),
527                                if_not_exists,
528                                or_replace,
529                                column_defaults,
530                                temporary,
531                            },
532                        )))
533                    }
534
535                    None => {
536                        let plan = EmptyRelation {
537                            produce_one_row: false,
538                            schema,
539                        };
540                        let plan = LogicalPlan::EmptyRelation(plan);
541                        let constraints = self.new_constraint_from_table_constraints(
542                            &all_constraints,
543                            plan.schema(),
544                        )?;
545                        Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
546                            CreateMemoryTable {
547                                name: self.object_name_to_table_reference(name)?,
548                                constraints,
549                                input: Arc::new(plan),
550                                if_not_exists,
551                                or_replace,
552                                column_defaults,
553                                temporary,
554                            },
555                        )))
556                    }
557                }
558            }
559            Statement::CreateView {
560                or_replace,
561                materialized,
562                name,
563                columns,
564                query,
565                options: CreateTableOptions::None,
566                cluster_by,
567                comment,
568                with_no_schema_binding,
569                if_not_exists,
570                temporary,
571                to,
572                params,
573                or_alter,
574                secure,
575                name_before_not_exists,
576            } => {
577                if materialized {
578                    return not_impl_err!("Materialized views not supported")?;
579                }
580                if !cluster_by.is_empty() {
581                    return not_impl_err!("Cluster by not supported")?;
582                }
583                if comment.is_some() {
584                    return not_impl_err!("Comment not supported")?;
585                }
586                if with_no_schema_binding {
587                    return not_impl_err!("With no schema binding not supported")?;
588                }
589                if if_not_exists {
590                    return not_impl_err!("If not exists not supported")?;
591                }
592                if to.is_some() {
593                    return not_impl_err!("To not supported")?;
594                }
595
596                // put the statement back together temporarily to get the SQL
597                // string representation
598                let stmt = Statement::CreateView {
599                    or_replace,
600                    materialized,
601                    name,
602                    columns,
603                    query,
604                    options: CreateTableOptions::None,
605                    cluster_by,
606                    comment,
607                    with_no_schema_binding,
608                    if_not_exists,
609                    temporary,
610                    to,
611                    params,
612                    or_alter,
613                    secure,
614                    name_before_not_exists,
615                };
616                let sql = stmt.to_string();
617                let Statement::CreateView {
618                    name,
619                    columns,
620                    query,
621                    or_replace,
622                    temporary,
623                    ..
624                } = stmt
625                else {
626                    return internal_err!("Unreachable code in create view");
627                };
628
629                let columns = columns
630                    .into_iter()
631                    .map(|view_column_def| {
632                        if let Some(options) = view_column_def.options {
633                            plan_err!(
634                                "Options not supported for view columns: {options:?}"
635                            )
636                        } else {
637                            Ok(view_column_def.name)
638                        }
639                    })
640                    .collect::<Result<Vec<_>>>()?;
641
642                let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
643                plan = self.apply_expr_alias(plan, columns)?;
644
645                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
646                    name: self.object_name_to_table_reference(name)?,
647                    input: Arc::new(plan),
648                    or_replace,
649                    definition: Some(sql),
650                    temporary,
651                })))
652            }
653            Statement::ShowCreate { obj_type, obj_name } => match obj_type {
654                ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
655                _ => {
656                    not_impl_err!("Only `SHOW CREATE TABLE  ...` statement is supported")
657                }
658            },
659            Statement::CreateSchema {
660                schema_name,
661                if_not_exists,
662                ..
663            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
664                CreateCatalogSchema {
665                    schema_name: get_schema_name(&schema_name),
666                    if_not_exists,
667                    schema: Arc::new(DFSchema::empty()),
668                },
669            ))),
670            Statement::CreateDatabase {
671                db_name,
672                if_not_exists,
673                ..
674            } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
675                CreateCatalog {
676                    catalog_name: object_name_to_string(&db_name),
677                    if_not_exists,
678                    schema: Arc::new(DFSchema::empty()),
679                },
680            ))),
681            Statement::Drop {
682                object_type,
683                if_exists,
684                mut names,
685                cascade,
686                restrict: _,
687                purge: _,
688                temporary: _,
689                table: _,
690            } => {
691                // We don't support cascade and purge for now.
692                // nor do we support multiple object names
693                let name = match names.len() {
694                    0 => Err(ParserError("Missing table name.".to_string()).into()),
695                    1 => self.object_name_to_table_reference(names.pop().unwrap()),
696                    _ => {
697                        Err(ParserError("Multiple objects not supported".to_string())
698                            .into())
699                    }
700                }?;
701
702                match object_type {
703                    ObjectType::Table => {
704                        Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
705                            name,
706                            if_exists,
707                            schema: DFSchemaRef::new(DFSchema::empty()),
708                        })))
709                    }
710                    ObjectType::View => {
711                        Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
712                            name,
713                            if_exists,
714                            schema: DFSchemaRef::new(DFSchema::empty()),
715                        })))
716                    }
717                    ObjectType::Schema => {
718                        let name = match name {
719                            TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }),
720                            TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table, catalog: schema }),
721                            TableReference::Full { catalog: _, schema: _, table: _ } => {
722                                Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
723                            }
724                        }?;
725                        Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
726                            name,
727                            if_exists,
728                            cascade,
729                            schema: DFSchemaRef::new(DFSchema::empty()),
730                        })))
731                    }
732                    _ => not_impl_err!(
733                        "Only `DROP TABLE/VIEW/SCHEMA  ...` statement is supported currently"
734                    ),
735                }
736            }
737            Statement::Prepare {
738                name,
739                data_types,
740                statement,
741            } => {
742                // Convert parser data types to DataFusion data types
743                let mut fields: Vec<FieldRef> = data_types
744                    .into_iter()
745                    .map(|t| self.convert_data_type_to_field(&t))
746                    .collect::<Result<_>>()?;
747
748                // Create planner context with parameters
749                let mut planner_context =
750                    PlannerContext::new().with_prepare_param_data_types(fields.clone());
751
752                // Build logical plan for inner statement of the prepare statement
753                let plan = self.sql_statement_to_plan_with_context_impl(
754                    *statement,
755                    &mut planner_context,
756                )?;
757
758                if fields.is_empty() {
759                    let map_types = plan.get_parameter_fields()?;
760                    let param_types: Vec<_> = (1..=map_types.len())
761                        .filter_map(|i| {
762                            let key = format!("${i}");
763                            map_types.get(&key).and_then(|opt| opt.clone())
764                        })
765                        .collect();
766                    fields.extend(param_types.iter().cloned());
767                    planner_context.with_prepare_param_data_types(param_types);
768                }
769
770                Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
771                    name: ident_to_string(&name),
772                    fields,
773                    input: Arc::new(plan),
774                })))
775            }
776            Statement::Execute {
777                name,
778                parameters,
779                using,
780                // has_parentheses specifies the syntax, but the plan is the
781                // same no matter the syntax used, so ignore it
782                has_parentheses: _,
783                immediate,
784                into,
785                output,
786                default,
787            } => {
788                // `USING` is a MySQL-specific syntax and currently not supported.
789                if !using.is_empty() {
790                    return not_impl_err!(
791                        "Execute statement with USING is not supported"
792                    );
793                }
794                if immediate {
795                    return not_impl_err!(
796                        "Execute statement with IMMEDIATE is not supported"
797                    );
798                }
799                if !into.is_empty() {
800                    return not_impl_err!("Execute statement with INTO is not supported");
801                }
802                if output {
803                    return not_impl_err!(
804                        "Execute statement with OUTPUT is not supported"
805                    );
806                }
807                if default {
808                    return not_impl_err!(
809                        "Execute statement with DEFAULT is not supported"
810                    );
811                }
812                let empty_schema = DFSchema::empty();
813                let parameters = parameters
814                    .into_iter()
815                    .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
816                    .collect::<Result<Vec<Expr>>>()?;
817
818                Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
819                    name: object_name_to_string(&name.unwrap()),
820                    parameters,
821                })))
822            }
823            Statement::Deallocate {
824                name,
825                // Similar to PostgreSQL, the PREPARE keyword is ignored
826                prepare: _,
827            } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
828                Deallocate {
829                    name: ident_to_string(&name),
830                },
831            ))),
832
833            Statement::ShowTables {
834                extended,
835                full,
836                terse,
837                history,
838                external,
839                show_options,
840            } => {
841                // We only support the basic "SHOW TABLES"
842                // https://github.com/apache/datafusion/issues/3188
843                if extended {
844                    return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
845                }
846                if full {
847                    return not_impl_err!("SHOW FULL TABLES not supported")?;
848                }
849                if terse {
850                    return not_impl_err!("SHOW TERSE TABLES not supported")?;
851                }
852                if history {
853                    return not_impl_err!("SHOW TABLES HISTORY not supported")?;
854                }
855                if external {
856                    return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
857                }
858                let ShowStatementOptions {
859                    show_in,
860                    starts_with,
861                    limit,
862                    limit_from,
863                    filter_position,
864                } = show_options;
865                if show_in.is_some() {
866                    return not_impl_err!("SHOW TABLES IN not supported")?;
867                }
868                if starts_with.is_some() {
869                    return not_impl_err!("SHOW TABLES LIKE not supported")?;
870                }
871                if limit.is_some() {
872                    return not_impl_err!("SHOW TABLES LIMIT not supported")?;
873                }
874                if limit_from.is_some() {
875                    return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
876                }
877                if filter_position.is_some() {
878                    return not_impl_err!("SHOW TABLES FILTER not supported")?;
879                }
880                self.show_tables_to_plan()
881            }
882
883            Statement::ShowColumns {
884                extended,
885                full,
886                show_options,
887            } => {
888                let ShowStatementOptions {
889                    show_in,
890                    starts_with,
891                    limit,
892                    limit_from,
893                    filter_position,
894                } = show_options;
895                if starts_with.is_some() {
896                    return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
897                }
898                if limit.is_some() {
899                    return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
900                }
901                if limit_from.is_some() {
902                    return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
903                }
904                if filter_position.is_some() {
905                    return not_impl_err!(
906                        "SHOW COLUMNS with WHERE or LIKE is not supported"
907                    )?;
908                }
909                let Some(ShowStatementIn {
910                    // specifies if the syntax was `SHOW COLUMNS IN` or `SHOW
911                    // COLUMNS FROM` which is not different in DataFusion
912                    clause: _,
913                    parent_type,
914                    parent_name,
915                }) = show_in
916                else {
917                    return plan_err!("SHOW COLUMNS requires a table name");
918                };
919
920                if let Some(parent_type) = parent_type {
921                    return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
922                }
923                let Some(table_name) = parent_name else {
924                    return plan_err!("SHOW COLUMNS requires a table name");
925                };
926
927                self.show_columns_to_plan(extended, full, table_name)
928            }
929
930            Statement::ShowFunctions { filter, .. } => {
931                self.show_functions_to_plan(filter)
932            }
933
934            Statement::Insert(Insert {
935                or,
936                into,
937                columns,
938                overwrite,
939                source,
940                partitioned,
941                after_columns,
942                table,
943                on,
944                returning,
945                ignore,
946                table_alias,
947                mut replace_into,
948                priority,
949                insert_alias,
950                assignments,
951                has_table_keyword,
952                settings,
953                format_clause,
954            }) => {
955                let table_name = match table {
956                    TableObject::TableName(table_name) => table_name,
957                    TableObject::TableFunction(_) => {
958                        return not_impl_err!("INSERT INTO Table functions not supported")
959                    }
960                };
961                if let Some(or) = or {
962                    match or {
963                        SqliteOnConflict::Replace => replace_into = true,
964                        _ => plan_err!("Inserts with {or} clause is not supported")?,
965                    }
966                }
967                if partitioned.is_some() {
968                    plan_err!("Partitioned inserts not yet supported")?;
969                }
970                if !after_columns.is_empty() {
971                    plan_err!("After-columns clause not supported")?;
972                }
973                if on.is_some() {
974                    plan_err!("Insert-on clause not supported")?;
975                }
976                if returning.is_some() {
977                    plan_err!("Insert-returning clause not supported")?;
978                }
979                if ignore {
980                    plan_err!("Insert-ignore clause not supported")?;
981                }
982                let Some(source) = source else {
983                    plan_err!("Inserts without a source not supported")?
984                };
985                if let Some(table_alias) = table_alias {
986                    plan_err!(
987                        "Inserts with a table alias not supported: {table_alias:?}"
988                    )?
989                };
990                if let Some(priority) = priority {
991                    plan_err!(
992                        "Inserts with a `PRIORITY` clause not supported: {priority:?}"
993                    )?
994                };
995                if insert_alias.is_some() {
996                    plan_err!("Inserts with an alias not supported")?;
997                }
998                if !assignments.is_empty() {
999                    plan_err!("Inserts with assignments not supported")?;
1000                }
1001                if settings.is_some() {
1002                    plan_err!("Inserts with settings not supported")?;
1003                }
1004                if format_clause.is_some() {
1005                    plan_err!("Inserts with format clause not supported")?;
1006                }
1007                // optional keywords don't change behavior
1008                let _ = into;
1009                let _ = has_table_keyword;
1010                self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
1011            }
1012            Statement::Update {
1013                table,
1014                assignments,
1015                from,
1016                selection,
1017                returning,
1018                or,
1019                limit,
1020            } => {
1021                let from_clauses =
1022                    from.map(|update_table_from_kind| match update_table_from_kind {
1023                        UpdateTableFromKind::BeforeSet(from_clauses) => from_clauses,
1024                        UpdateTableFromKind::AfterSet(from_clauses) => from_clauses,
1025                    });
1026                // TODO: support multiple tables in UPDATE SET FROM
1027                if from_clauses.as_ref().is_some_and(|f| f.len() > 1) {
1028                    plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?;
1029                }
1030                let update_from = from_clauses.and_then(|mut f| f.pop());
1031                if returning.is_some() {
1032                    plan_err!("Update-returning clause not yet supported")?;
1033                }
1034                if or.is_some() {
1035                    plan_err!("ON conflict not supported")?;
1036                }
1037                if limit.is_some() {
1038                    return not_impl_err!("Update-limit clause not supported")?;
1039                }
1040                self.update_to_plan(table, assignments, update_from, selection)
1041            }
1042
1043            Statement::Delete(Delete {
1044                tables,
1045                using,
1046                selection,
1047                returning,
1048                from,
1049                order_by,
1050                limit,
1051            }) => {
1052                if !tables.is_empty() {
1053                    plan_err!("DELETE <TABLE> not supported")?;
1054                }
1055
1056                if using.is_some() {
1057                    plan_err!("Using clause not supported")?;
1058                }
1059
1060                if returning.is_some() {
1061                    plan_err!("Delete-returning clause not yet supported")?;
1062                }
1063
1064                if !order_by.is_empty() {
1065                    plan_err!("Delete-order-by clause not yet supported")?;
1066                }
1067
1068                if limit.is_some() {
1069                    plan_err!("Delete-limit clause not yet supported")?;
1070                }
1071
1072                let table_name = self.get_delete_target(from)?;
1073                self.delete_to_plan(table_name, selection)
1074            }
1075
1076            Statement::StartTransaction {
1077                modes,
1078                begin: false,
1079                modifier,
1080                transaction,
1081                statements,
1082                has_end_keyword,
1083                exception,
1084            } => {
1085                if let Some(modifier) = modifier {
1086                    return not_impl_err!(
1087                        "Transaction modifier not supported: {modifier}"
1088                    );
1089                }
1090                if !statements.is_empty() {
1091                    return not_impl_err!(
1092                        "Transaction with multiple statements not supported"
1093                    );
1094                }
1095                if exception.is_some() {
1096                    return not_impl_err!(
1097                        "Transaction with exception statements not supported"
1098                    );
1099                }
1100                if has_end_keyword {
1101                    return not_impl_err!("Transaction with END keyword not supported");
1102                }
1103                self.validate_transaction_kind(transaction)?;
1104                let isolation_level: ast::TransactionIsolationLevel = modes
1105                    .iter()
1106                    .filter_map(|m: &TransactionMode| match m {
1107                        TransactionMode::AccessMode(_) => None,
1108                        TransactionMode::IsolationLevel(level) => Some(level),
1109                    })
1110                    .next_back()
1111                    .copied()
1112                    .unwrap_or(ast::TransactionIsolationLevel::Serializable);
1113                let access_mode: ast::TransactionAccessMode = modes
1114                    .iter()
1115                    .filter_map(|m: &TransactionMode| match m {
1116                        TransactionMode::AccessMode(mode) => Some(mode),
1117                        TransactionMode::IsolationLevel(_) => None,
1118                    })
1119                    .next_back()
1120                    .copied()
1121                    .unwrap_or(ast::TransactionAccessMode::ReadWrite);
1122                let isolation_level = match isolation_level {
1123                    ast::TransactionIsolationLevel::ReadUncommitted => {
1124                        TransactionIsolationLevel::ReadUncommitted
1125                    }
1126                    ast::TransactionIsolationLevel::ReadCommitted => {
1127                        TransactionIsolationLevel::ReadCommitted
1128                    }
1129                    ast::TransactionIsolationLevel::RepeatableRead => {
1130                        TransactionIsolationLevel::RepeatableRead
1131                    }
1132                    ast::TransactionIsolationLevel::Serializable => {
1133                        TransactionIsolationLevel::Serializable
1134                    }
1135                    ast::TransactionIsolationLevel::Snapshot => {
1136                        TransactionIsolationLevel::Snapshot
1137                    }
1138                };
1139                let access_mode = match access_mode {
1140                    ast::TransactionAccessMode::ReadOnly => {
1141                        TransactionAccessMode::ReadOnly
1142                    }
1143                    ast::TransactionAccessMode::ReadWrite => {
1144                        TransactionAccessMode::ReadWrite
1145                    }
1146                };
1147                let statement = PlanStatement::TransactionStart(TransactionStart {
1148                    access_mode,
1149                    isolation_level,
1150                });
1151                Ok(LogicalPlan::Statement(statement))
1152            }
1153            Statement::Commit {
1154                chain,
1155                end,
1156                modifier,
1157            } => {
1158                if end {
1159                    return not_impl_err!("COMMIT AND END not supported");
1160                };
1161                if let Some(modifier) = modifier {
1162                    return not_impl_err!("COMMIT {modifier} not supported");
1163                };
1164                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1165                    conclusion: TransactionConclusion::Commit,
1166                    chain,
1167                });
1168                Ok(LogicalPlan::Statement(statement))
1169            }
1170            Statement::Rollback { chain, savepoint } => {
1171                if savepoint.is_some() {
1172                    plan_err!("Savepoints not supported")?;
1173                }
1174                let statement = PlanStatement::TransactionEnd(TransactionEnd {
1175                    conclusion: TransactionConclusion::Rollback,
1176                    chain,
1177                });
1178                Ok(LogicalPlan::Statement(statement))
1179            }
1180            Statement::CreateFunction(ast::CreateFunction {
1181                or_replace,
1182                temporary,
1183                name,
1184                args,
1185                return_type,
1186                function_body,
1187                behavior,
1188                language,
1189                ..
1190            }) => {
1191                let return_type = match return_type {
1192                    Some(t) => Some(self.convert_data_type_to_field(&t)?),
1193                    None => None,
1194                };
1195                let mut planner_context = PlannerContext::new();
1196                let empty_schema = &DFSchema::empty();
1197
1198                let args = match args {
1199                    Some(function_args) => {
1200                        let function_args = function_args
1201                            .into_iter()
1202                            .map(|arg| {
1203                                let data_type =
1204                                    self.convert_data_type_to_field(&arg.data_type)?;
1205
1206                                let default_expr = match arg.default_expr {
1207                                    Some(expr) => Some(self.sql_to_expr(
1208                                        expr,
1209                                        empty_schema,
1210                                        &mut planner_context,
1211                                    )?),
1212                                    None => None,
1213                                };
1214                                Ok(OperateFunctionArg {
1215                                    name: arg.name,
1216                                    default_expr,
1217                                    data_type: data_type.data_type().clone(),
1218                                })
1219                            })
1220                            .collect::<Result<Vec<OperateFunctionArg>>>();
1221                        Some(function_args?)
1222                    }
1223                    None => None,
1224                };
1225                // At the moment functions can't be qualified `schema.name`
1226                let name = match &name.0[..] {
1227                    [] => exec_err!("Function should have name")?,
1228                    [n] => n.as_ident().unwrap().value.clone(),
1229                    [..] => not_impl_err!("Qualified functions are not supported")?,
1230                };
1231                //
1232                // Convert resulting expression to data fusion expression
1233                //
1234                let arg_types = args.as_ref().map(|arg| {
1235                    arg.iter()
1236                        .map(|t| Arc::new(Field::new("", t.data_type.clone(), true)))
1237                        .collect::<Vec<_>>()
1238                });
1239                let mut planner_context = PlannerContext::new()
1240                    .with_prepare_param_data_types(arg_types.unwrap_or_default());
1241
1242                let function_body = match function_body {
1243                    Some(r) => Some(self.sql_to_expr(
1244                        match r {
1245                            ast::CreateFunctionBody::AsBeforeOptions(expr) => expr,
1246                            ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1247                            ast::CreateFunctionBody::Return(expr) => expr,
1248                            ast::CreateFunctionBody::AsBeginEnd(_) => {
1249                                return not_impl_err!(
1250                                    "BEGIN/END enclosed function body syntax is not supported"
1251                                )?;
1252                            }
1253                            ast::CreateFunctionBody::AsReturnExpr(_)
1254                            | ast::CreateFunctionBody::AsReturnSelect(_) => {
1255                                return not_impl_err!(
1256                                    "AS RETURN function syntax is not supported"
1257                                )?
1258                            }
1259                        },
1260                        &DFSchema::empty(),
1261                        &mut planner_context,
1262                    )?),
1263                    None => None,
1264                };
1265
1266                let params = CreateFunctionBody {
1267                    language,
1268                    behavior: behavior.map(|b| match b {
1269                        ast::FunctionBehavior::Immutable => Volatility::Immutable,
1270                        ast::FunctionBehavior::Stable => Volatility::Stable,
1271                        ast::FunctionBehavior::Volatile => Volatility::Volatile,
1272                    }),
1273                    function_body,
1274                };
1275
1276                let statement = DdlStatement::CreateFunction(CreateFunction {
1277                    or_replace,
1278                    temporary,
1279                    name,
1280                    return_type: return_type.map(|f| f.data_type().clone()),
1281                    args,
1282                    params,
1283                    schema: DFSchemaRef::new(DFSchema::empty()),
1284                });
1285
1286                Ok(LogicalPlan::Ddl(statement))
1287            }
1288            Statement::DropFunction {
1289                if_exists,
1290                func_desc,
1291                ..
1292            } => {
1293                // According to postgresql documentation it can be only one function
1294                // specified in drop statement
1295                if let Some(desc) = func_desc.first() {
1296                    // At the moment functions can't be qualified `schema.name`
1297                    let name = match &desc.name.0[..] {
1298                        [] => exec_err!("Function should have name")?,
1299                        [n] => n.as_ident().unwrap().value.clone(),
1300                        [..] => not_impl_err!("Qualified functions are not supported")?,
1301                    };
1302                    let statement = DdlStatement::DropFunction(DropFunction {
1303                        if_exists,
1304                        name,
1305                        schema: DFSchemaRef::new(DFSchema::empty()),
1306                    });
1307                    Ok(LogicalPlan::Ddl(statement))
1308                } else {
1309                    exec_err!("Function name not provided")
1310                }
1311            }
1312            Statement::CreateIndex(CreateIndex {
1313                name,
1314                table_name,
1315                using,
1316                columns,
1317                unique,
1318                if_not_exists,
1319                ..
1320            }) => {
1321                let name: Option<String> = name.as_ref().map(object_name_to_string);
1322                let table = self.object_name_to_table_reference(table_name)?;
1323                let table_schema = self
1324                    .context_provider
1325                    .get_table_source(table.clone())?
1326                    .schema()
1327                    .to_dfschema_ref()?;
1328                let using: Option<String> =
1329                    using.as_ref().map(|index_type| match index_type {
1330                        IndexType::Custom(ident) => ident_to_string(ident),
1331                        _ => index_type.to_string().to_ascii_lowercase(),
1332                    });
1333                let order_by_exprs: Vec<OrderByExpr> =
1334                    columns.into_iter().map(|col| col.column).collect();
1335                let columns = self.order_by_to_sort_expr(
1336                    order_by_exprs,
1337                    &table_schema,
1338                    planner_context,
1339                    false,
1340                    None,
1341                )?;
1342                Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1343                    PlanCreateIndex {
1344                        name,
1345                        table,
1346                        using,
1347                        columns,
1348                        unique,
1349                        if_not_exists,
1350                        schema: DFSchemaRef::new(DFSchema::empty()),
1351                    },
1352                )))
1353            }
1354            stmt => {
1355                not_impl_err!("Unsupported SQL statement: {stmt}")
1356            }
1357        }
1358    }
1359
1360    fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1361        let mut from = match from {
1362            FromTable::WithFromKeyword(v) => v,
1363            FromTable::WithoutKeyword(v) => v,
1364        };
1365
1366        if from.len() != 1 {
1367            return not_impl_err!(
1368                "DELETE FROM only supports single table, got {}: {from:?}",
1369                from.len()
1370            );
1371        }
1372        let table_factor = from.pop().unwrap();
1373        if !table_factor.joins.is_empty() {
1374            return not_impl_err!("DELETE FROM only supports single table, got: joins");
1375        }
1376        let TableFactor::Table { name, .. } = table_factor.relation else {
1377            return not_impl_err!(
1378                "DELETE FROM only supports single table, got: {table_factor:?}"
1379            );
1380        };
1381
1382        Ok(name)
1383    }
1384
1385    /// Generate a logical plan from a "SHOW TABLES" query
1386    fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1387        if self.has_table("information_schema", "tables") {
1388            let query = "SELECT * FROM information_schema.tables;";
1389            let mut rewrite = DFParser::parse_sql(query)?;
1390            assert_eq!(rewrite.len(), 1);
1391            self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
1392        } else {
1393            plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1394        }
1395    }
1396
1397    fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1398        let table_ref = self.object_name_to_table_reference(table_name)?;
1399
1400        let table_source = self.context_provider.get_table_source(table_ref)?;
1401
1402        let schema = table_source.schema();
1403
1404        let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1405
1406        Ok(LogicalPlan::DescribeTable(DescribeTable {
1407            schema,
1408            output_schema: Arc::new(output_schema),
1409        }))
1410    }
1411
1412    fn describe_query_to_plan(&self, query: Query) -> Result<LogicalPlan> {
1413        let plan = self.query_to_plan(query, &mut PlannerContext::new())?;
1414
1415        let schema = Arc::new(plan.schema().as_arrow().clone());
1416
1417        let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1418
1419        Ok(LogicalPlan::DescribeTable(DescribeTable {
1420            schema,
1421            output_schema: Arc::new(output_schema),
1422        }))
1423    }
1424
1425    fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1426        // Determine if source is table or query and handle accordingly
1427        let copy_source = statement.source;
1428        let (input, input_schema, table_ref) = match copy_source {
1429            CopyToSource::Relation(object_name) => {
1430                let table_name = object_name_to_string(&object_name);
1431                let table_ref = self.object_name_to_table_reference(object_name)?;
1432                let table_source =
1433                    self.context_provider.get_table_source(table_ref.clone())?;
1434                let plan =
1435                    LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1436                let input_schema = Arc::clone(plan.schema());
1437                (plan, input_schema, Some(table_ref))
1438            }
1439            CopyToSource::Query(query) => {
1440                let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1441                let input_schema = Arc::clone(plan.schema());
1442                (plan, input_schema, None)
1443            }
1444        };
1445
1446        let options_map = self.parse_options_map(statement.options, true)?;
1447
1448        let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1449            self.context_provider.get_file_type(stored_as).ok()
1450        } else {
1451            None
1452        };
1453
1454        let file_type = match maybe_file_type {
1455            Some(ft) => ft,
1456            None => {
1457                let e = || {
1458                    DataFusionError::Configuration(
1459                        "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1460                            .to_string(),
1461                    )
1462                };
1463                // Try to infer file format from file extension
1464                let extension: &str = &Path::new(&statement.target)
1465                    .extension()
1466                    .ok_or_else(e)?
1467                    .to_str()
1468                    .ok_or_else(e)?
1469                    .to_lowercase();
1470
1471                self.context_provider.get_file_type(extension)?
1472            }
1473        };
1474
1475        let partition_by = statement
1476            .partitioned_by
1477            .iter()
1478            .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1479            .collect::<Result<Vec<_>>>()?
1480            .into_iter()
1481            .map(|f| f.name().to_owned())
1482            .collect();
1483
1484        Ok(LogicalPlan::Copy(CopyTo::new(
1485            Arc::new(input),
1486            statement.target,
1487            partition_by,
1488            file_type,
1489            options_map,
1490        )))
1491    }
1492
1493    fn build_order_by(
1494        &self,
1495        order_exprs: Vec<LexOrdering>,
1496        schema: &DFSchemaRef,
1497        planner_context: &mut PlannerContext,
1498    ) -> Result<Vec<Vec<SortExpr>>> {
1499        if !order_exprs.is_empty() && schema.fields().is_empty() {
1500            let results = order_exprs
1501                .iter()
1502                .map(|lex_order| {
1503                    let result = lex_order
1504                        .iter()
1505                        .map(|order_by_expr| {
1506                            let ordered_expr = &order_by_expr.expr;
1507                            let ordered_expr = ordered_expr.to_owned();
1508                            let ordered_expr = self.sql_expr_to_logical_expr(
1509                                ordered_expr,
1510                                schema,
1511                                planner_context,
1512                            )?;
1513                            let asc = order_by_expr.options.asc.unwrap_or(true);
1514                            let nulls_first =
1515                                order_by_expr.options.nulls_first.unwrap_or_else(|| {
1516                                    self.options.default_null_ordering.nulls_first(asc)
1517                                });
1518
1519                            Ok(SortExpr::new(ordered_expr, asc, nulls_first))
1520                        })
1521                        .collect::<Result<Vec<SortExpr>>>()?;
1522                    Ok(result)
1523                })
1524                .collect::<Result<Vec<Vec<SortExpr>>>>()?;
1525
1526            return Ok(results);
1527        }
1528
1529        let mut all_results = vec![];
1530        for expr in order_exprs {
1531            // Convert each OrderByExpr to a SortExpr:
1532            let expr_vec =
1533                self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1534            // Verify that columns of all SortExprs exist in the schema:
1535            for sort in expr_vec.iter() {
1536                for column in sort.expr.column_refs().iter() {
1537                    if !schema.has_column(column) {
1538                        // Return an error if any column is not in the schema:
1539                        return plan_err!("Column {column} is not in schema");
1540                    }
1541                }
1542            }
1543            // If all SortExprs are valid, return them as an expression vector
1544            all_results.push(expr_vec)
1545        }
1546        Ok(all_results)
1547    }
1548
1549    /// Generate a logical plan from a CREATE EXTERNAL TABLE statement
1550    fn external_table_to_plan(
1551        &self,
1552        statement: CreateExternalTable,
1553    ) -> Result<LogicalPlan> {
1554        let definition = Some(statement.to_string());
1555        let CreateExternalTable {
1556            name,
1557            columns,
1558            file_type,
1559            location,
1560            table_partition_cols,
1561            if_not_exists,
1562            temporary,
1563            order_exprs,
1564            unbounded,
1565            options,
1566            constraints,
1567            or_replace,
1568        } = statement;
1569
1570        // Merge inline constraints and existing constraints
1571        let mut all_constraints = constraints;
1572        let inline_constraints = calc_inline_constraints_from_columns(&columns);
1573        all_constraints.extend(inline_constraints);
1574
1575        let options_map = self.parse_options_map(options, false)?;
1576
1577        let compression = options_map
1578            .get("format.compression")
1579            .map(|c| CompressionTypeVariant::from_str(c))
1580            .transpose()?;
1581        if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1582            && compression
1583                .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1584                .unwrap_or(false)
1585        {
1586            plan_err!(
1587                "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1588            )?;
1589        }
1590
1591        let mut planner_context = PlannerContext::new();
1592
1593        let column_defaults = self
1594            .build_column_defaults(&columns, &mut planner_context)?
1595            .into_iter()
1596            .collect();
1597
1598        let schema = self.build_schema(columns)?;
1599        let df_schema = schema.to_dfschema_ref()?;
1600        df_schema.check_names()?;
1601
1602        let ordered_exprs =
1603            self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1604
1605        let name = self.object_name_to_table_reference(name)?;
1606        let constraints =
1607            self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1608        Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1609            PlanCreateExternalTable {
1610                schema: df_schema,
1611                name,
1612                location,
1613                file_type,
1614                table_partition_cols,
1615                if_not_exists,
1616                or_replace,
1617                temporary,
1618                definition,
1619                order_exprs: ordered_exprs,
1620                unbounded,
1621                options: options_map,
1622                constraints,
1623                column_defaults,
1624            },
1625        )))
1626    }
1627
1628    /// Get the indices of the constraint columns in the schema.
1629    /// If any column is not found, return an error.
1630    fn get_constraint_column_indices(
1631        &self,
1632        df_schema: &DFSchemaRef,
1633        columns: &[IndexColumn],
1634        constraint_name: &str,
1635    ) -> Result<Vec<usize>> {
1636        let field_names = df_schema.field_names();
1637        columns
1638            .iter()
1639            .map(|index_column| {
1640                let expr = &index_column.column.expr;
1641                let ident = if let SQLExpr::Identifier(ident) = expr {
1642                    ident
1643                } else {
1644                    return Err(plan_datafusion_err!(
1645                        "Column name for {constraint_name} must be an identifier: {expr}"
1646                    ));
1647                };
1648                let column = self.ident_normalizer.normalize(ident.clone());
1649                field_names
1650                    .iter()
1651                    .position(|item| *item == column)
1652                    .ok_or_else(|| {
1653                        plan_datafusion_err!(
1654                            "Column for {constraint_name} not found in schema: {column}"
1655                        )
1656                    })
1657            })
1658            .collect::<Result<Vec<_>>>()
1659    }
1660
1661    /// Convert each [TableConstraint] to corresponding [Constraint]
1662    pub fn new_constraint_from_table_constraints(
1663        &self,
1664        constraints: &[TableConstraint],
1665        df_schema: &DFSchemaRef,
1666    ) -> Result<Constraints> {
1667        let constraints = constraints
1668            .iter()
1669            .map(|c: &TableConstraint| match c {
1670                TableConstraint::Unique { name, columns, .. } => {
1671                    let constraint_name = match name {
1672                        Some(name) => &format!("unique constraint with name '{name}'"),
1673                        None => "unique constraint",
1674                    };
1675                    // Get unique constraint indices in the schema
1676                    let indices = self.get_constraint_column_indices(
1677                        df_schema,
1678                        columns,
1679                        constraint_name,
1680                    )?;
1681                    Ok(Constraint::Unique(indices))
1682                }
1683                TableConstraint::PrimaryKey { columns, .. } => {
1684                    // Get primary key indices in the schema
1685                    let indices = self.get_constraint_column_indices(
1686                        df_schema,
1687                        columns,
1688                        "primary key",
1689                    )?;
1690                    Ok(Constraint::PrimaryKey(indices))
1691                }
1692                TableConstraint::ForeignKey { .. } => {
1693                    _plan_err!("Foreign key constraints are not currently supported")
1694                }
1695                TableConstraint::Check { .. } => {
1696                    _plan_err!("Check constraints are not currently supported")
1697                }
1698                TableConstraint::Index { .. } => {
1699                    _plan_err!("Indexes are not currently supported")
1700                }
1701                TableConstraint::FulltextOrSpatial { .. } => {
1702                    _plan_err!("Indexes are not currently supported")
1703                }
1704            })
1705            .collect::<Result<Vec<_>>>()?;
1706        Ok(Constraints::new_unverified(constraints))
1707    }
1708
1709    fn parse_options_map(
1710        &self,
1711        options: Vec<(String, Value)>,
1712        allow_duplicates: bool,
1713    ) -> Result<HashMap<String, String>> {
1714        let mut options_map = HashMap::new();
1715        for (key, value) in options {
1716            if !allow_duplicates && options_map.contains_key(&key) {
1717                return plan_err!("Option {key} is specified multiple times");
1718            }
1719
1720            let Some(value_string) = crate::utils::value_to_string(&value) else {
1721                return plan_err!("Unsupported Value {}", value);
1722            };
1723
1724            if !(&key.contains('.')) {
1725                // If config does not belong to any namespace, assume it is
1726                // a format option and apply the format prefix for backwards
1727                // compatibility.
1728                let renamed_key = format!("format.{key}");
1729                options_map.insert(renamed_key.to_lowercase(), value_string);
1730            } else {
1731                options_map.insert(key.to_lowercase(), value_string);
1732            }
1733        }
1734
1735        Ok(options_map)
1736    }
1737
1738    /// Generate a plan for EXPLAIN ... that will print out a plan
1739    ///
1740    /// Note this is the sqlparser explain statement, not the
1741    /// datafusion `EXPLAIN` statement.
1742    fn explain_to_plan(
1743        &self,
1744        verbose: bool,
1745        analyze: bool,
1746        format: Option<String>,
1747        statement: DFStatement,
1748    ) -> Result<LogicalPlan> {
1749        let plan = self.statement_to_plan(statement)?;
1750        if matches!(plan, LogicalPlan::Explain(_)) {
1751            return plan_err!("Nested EXPLAINs are not supported");
1752        }
1753
1754        let plan = Arc::new(plan);
1755        let schema = LogicalPlan::explain_schema();
1756        let schema = schema.to_dfschema_ref()?;
1757
1758        if verbose && format.is_some() {
1759            return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
1760        }
1761
1762        if analyze {
1763            if format.is_some() {
1764                return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
1765            }
1766            Ok(LogicalPlan::Analyze(Analyze {
1767                verbose,
1768                input: plan,
1769                schema,
1770            }))
1771        } else {
1772            let stringified_plans =
1773                vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
1774
1775            // default to configuration value
1776            // verbose mode only supports indent format
1777            let options = self.context_provider.options();
1778            let format = if verbose {
1779                ExplainFormat::Indent
1780            } else if let Some(format) = format {
1781                ExplainFormat::from_str(&format)?
1782            } else {
1783                options.explain.format.clone()
1784            };
1785
1786            Ok(LogicalPlan::Explain(Explain {
1787                verbose,
1788                explain_format: format,
1789                plan,
1790                stringified_plans,
1791                schema,
1792                logical_optimization_succeeded: false,
1793            }))
1794        }
1795    }
1796
1797    fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
1798        if !self.has_table("information_schema", "df_settings") {
1799            return plan_err!(
1800                "SHOW [VARIABLE] is not supported unless information_schema is enabled"
1801            );
1802        }
1803
1804        let verbose = variable
1805            .last()
1806            .map(|s| ident_to_string(s) == "verbose")
1807            .unwrap_or(false);
1808        let mut variable_vec = variable.to_vec();
1809        let mut columns: String = "name, value".to_owned();
1810
1811        if verbose {
1812            columns = format!("{columns}, description");
1813            variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
1814        }
1815
1816        let variable = object_name_to_string(&ObjectName::from(variable_vec));
1817        let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
1818        let query = if variable == "all" {
1819            // Add an ORDER BY so the output comes out in a consistent order
1820            format!("{base_query} ORDER BY name")
1821        } else if variable == "timezone" || variable == "time.zone" {
1822            // we could introduce alias in OptionDefinition if this string matching thing grows
1823            format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
1824        } else {
1825            // These values are what are used to make the information_schema table, so we just
1826            // check here, before actually planning or executing the query, if it would produce no
1827            // results, and error preemptively if it would (for a better UX)
1828            let is_valid_variable = self
1829                .context_provider
1830                .options()
1831                .entries()
1832                .iter()
1833                .any(|opt| opt.key == variable);
1834
1835            if !is_valid_variable {
1836                return plan_err!(
1837                    "'{variable}' is not a variable which can be viewed with 'SHOW'"
1838                );
1839            }
1840
1841            format!("{base_query} WHERE name = '{variable}'")
1842        };
1843
1844        let mut rewrite = DFParser::parse_sql(&query)?;
1845        assert_eq!(rewrite.len(), 1);
1846
1847        self.statement_to_plan(rewrite.pop_front().unwrap())
1848    }
1849
1850    fn set_statement_to_plan(&self, statement: Set) -> Result<LogicalPlan> {
1851        match statement {
1852            Set::SingleAssignment {
1853                scope,
1854                hivevar,
1855                variable,
1856                values,
1857            } => {
1858                if scope.is_some() {
1859                    return not_impl_err!("SET with scope modifiers is not supported");
1860                }
1861
1862                if hivevar {
1863                    return not_impl_err!("SET HIVEVAR is not supported");
1864                }
1865
1866                let variable = object_name_to_string(&variable);
1867                let mut variable_lower = variable.to_lowercase();
1868
1869                if variable_lower == "timezone" || variable_lower == "time.zone" {
1870                    variable_lower = "datafusion.execution.time_zone".to_string();
1871                }
1872
1873                if values.len() != 1 {
1874                    return plan_err!("SET only supports single value assignment");
1875                }
1876
1877                let value_string = match &values[0] {
1878                    SQLExpr::Identifier(i) => ident_to_string(i),
1879                    SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
1880                        None => {
1881                            return plan_err!("Unsupported value {:?}", v.value);
1882                        }
1883                        Some(s) => s,
1884                    },
1885                    SQLExpr::UnaryOp { op, expr } => match op {
1886                        UnaryOperator::Plus => format!("+{expr}"),
1887                        UnaryOperator::Minus => format!("-{expr}"),
1888                        _ => return plan_err!("Unsupported unary op {:?}", op),
1889                    },
1890                    _ => return plan_err!("Unsupported expr {:?}", values[0]),
1891                };
1892
1893                Ok(LogicalPlan::Statement(PlanStatement::SetVariable(
1894                    SetVariable {
1895                        variable: variable_lower,
1896                        value: value_string,
1897                    },
1898                )))
1899            }
1900            other => not_impl_err!("SET variant not implemented yet: {other:?}"),
1901        }
1902    }
1903
1904    fn delete_to_plan(
1905        &self,
1906        table_name: ObjectName,
1907        predicate_expr: Option<SQLExpr>,
1908    ) -> Result<LogicalPlan> {
1909        // Do a table lookup to verify the table exists
1910        let table_ref = self.object_name_to_table_reference(table_name.clone())?;
1911        let table_source = self.context_provider.get_table_source(table_ref.clone())?;
1912        let schema = DFSchema::try_from_qualified_schema(
1913            table_ref.clone(),
1914            &table_source.schema(),
1915        )?;
1916        let scan =
1917            LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
1918                .build()?;
1919        let mut planner_context = PlannerContext::new();
1920
1921        let source = match predicate_expr {
1922            None => scan,
1923            Some(predicate_expr) => {
1924                let filter_expr =
1925                    self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
1926                let schema = Arc::new(schema);
1927                let mut using_columns = HashSet::new();
1928                expr_to_columns(&filter_expr, &mut using_columns)?;
1929                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1930                    filter_expr,
1931                    &[&[&schema]],
1932                    &[using_columns],
1933                )?;
1934                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1935            }
1936        };
1937
1938        let plan = LogicalPlan::Dml(DmlStatement::new(
1939            table_ref,
1940            table_source,
1941            WriteOp::Delete,
1942            Arc::new(source),
1943        ));
1944        Ok(plan)
1945    }
1946
1947    fn update_to_plan(
1948        &self,
1949        table: TableWithJoins,
1950        assignments: Vec<Assignment>,
1951        from: Option<TableWithJoins>,
1952        predicate_expr: Option<SQLExpr>,
1953    ) -> Result<LogicalPlan> {
1954        let (table_name, table_alias) = match &table.relation {
1955            TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
1956            _ => plan_err!("Cannot update non-table relation!")?,
1957        };
1958
1959        // Do a table lookup to verify the table exists
1960        let table_name = self.object_name_to_table_reference(table_name)?;
1961        let table_source = self.context_provider.get_table_source(table_name.clone())?;
1962        let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
1963            table_name.clone(),
1964            &table_source.schema(),
1965        )?);
1966
1967        // Overwrite with assignment expressions
1968        let mut planner_context = PlannerContext::new();
1969        let mut assign_map = assignments
1970            .iter()
1971            .map(|assign| {
1972                let cols = match &assign.target {
1973                    AssignmentTarget::ColumnName(cols) => cols,
1974                    _ => plan_err!("Tuples are not supported")?,
1975                };
1976                let col_name: &Ident = cols
1977                    .0
1978                    .iter()
1979                    .last()
1980                    .ok_or_else(|| plan_datafusion_err!("Empty column id"))?
1981                    .as_ident()
1982                    .unwrap();
1983                // Validate that the assignment target column exists
1984                table_schema.field_with_unqualified_name(&col_name.value)?;
1985                Ok((col_name.value.clone(), assign.value.clone()))
1986            })
1987            .collect::<Result<HashMap<String, SQLExpr>>>()?;
1988
1989        // Build scan, join with from table if it exists.
1990        let mut input_tables = vec![table];
1991        input_tables.extend(from);
1992        let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
1993
1994        // Filter
1995        let source = match predicate_expr {
1996            None => scan,
1997            Some(predicate_expr) => {
1998                let filter_expr = self.sql_to_expr(
1999                    predicate_expr,
2000                    scan.schema(),
2001                    &mut planner_context,
2002                )?;
2003                let mut using_columns = HashSet::new();
2004                expr_to_columns(&filter_expr, &mut using_columns)?;
2005                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
2006                    filter_expr,
2007                    &[&[scan.schema()]],
2008                    &[using_columns],
2009                )?;
2010                LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
2011            }
2012        };
2013
2014        // Build updated values for each column, using the previous value if not modified
2015        let exprs = table_schema
2016            .iter()
2017            .map(|(qualifier, field)| {
2018                let expr = match assign_map.remove(field.name()) {
2019                    Some(new_value) => {
2020                        let mut expr = self.sql_to_expr(
2021                            new_value,
2022                            source.schema(),
2023                            &mut planner_context,
2024                        )?;
2025                        // Update placeholder's datatype to the type of the target column
2026                        if let Expr::Placeholder(placeholder) = &mut expr {
2027                            placeholder.field = placeholder
2028                                .field
2029                                .take()
2030                                .or_else(|| Some(Arc::clone(field)));
2031                        }
2032                        // Cast to target column type, if necessary
2033                        expr.cast_to(field.data_type(), source.schema())?
2034                    }
2035                    None => {
2036                        // If the target table has an alias, use it to qualify the column name
2037                        if let Some(alias) = &table_alias {
2038                            Expr::Column(Column::new(
2039                                Some(self.ident_normalizer.normalize(alias.name.clone())),
2040                                field.name(),
2041                            ))
2042                        } else {
2043                            Expr::Column(Column::from((qualifier, field)))
2044                        }
2045                    }
2046                };
2047                Ok(expr.alias(field.name()))
2048            })
2049            .collect::<Result<Vec<_>>>()?;
2050
2051        let source = project(source, exprs)?;
2052
2053        let plan = LogicalPlan::Dml(DmlStatement::new(
2054            table_name,
2055            table_source,
2056            WriteOp::Update,
2057            Arc::new(source),
2058        ));
2059        Ok(plan)
2060    }
2061
2062    fn insert_to_plan(
2063        &self,
2064        table_name: ObjectName,
2065        columns: Vec<Ident>,
2066        source: Box<Query>,
2067        overwrite: bool,
2068        replace_into: bool,
2069    ) -> Result<LogicalPlan> {
2070        // Do a table lookup to verify the table exists
2071        let table_name = self.object_name_to_table_reference(table_name)?;
2072        let table_source = self.context_provider.get_table_source(table_name.clone())?;
2073        let table_schema = DFSchema::try_from(table_source.schema())?;
2074
2075        // Get insert fields and target table's value indices
2076        //
2077        // If value_indices[i] = Some(j), it means that the value of the i-th target table's column is
2078        // derived from the j-th output of the source.
2079        //
2080        // If value_indices[i] = None, it means that the value of the i-th target table's column is
2081        // not provided, and should be filled with a default value later.
2082        let (fields, value_indices) = if columns.is_empty() {
2083            // Empty means we're inserting into all columns of the table
2084            (
2085                table_schema.fields().clone(),
2086                (0..table_schema.fields().len())
2087                    .map(Some)
2088                    .collect::<Vec<_>>(),
2089            )
2090        } else {
2091            let mut value_indices = vec![None; table_schema.fields().len()];
2092            let fields = columns
2093                .into_iter()
2094                .enumerate()
2095                .map(|(i, c)| {
2096                    let c = self.ident_normalizer.normalize(c);
2097                    let column_index = table_schema
2098                        .index_of_column_by_name(None, &c)
2099                        .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
2100
2101                    if value_indices[column_index].is_some() {
2102                        return schema_err!(SchemaError::DuplicateUnqualifiedField {
2103                            name: c,
2104                        });
2105                    } else {
2106                        value_indices[column_index] = Some(i);
2107                    }
2108                    Ok(table_schema.field(column_index).clone())
2109                })
2110                .collect::<Result<Vec<_>>>()?;
2111            (Fields::from(fields), value_indices)
2112        };
2113
2114        // infer types for Values clause... other types should be resolvable the regular way
2115        let mut prepare_param_data_types = BTreeMap::new();
2116        if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
2117            for row in rows.iter() {
2118                for (idx, val) in row.iter().enumerate() {
2119                    if let SQLExpr::Value(ValueWithSpan {
2120                        value: Value::Placeholder(name),
2121                        span: _,
2122                    }) = val
2123                    {
2124                        let name =
2125                            name.replace('$', "").parse::<usize>().map_err(|_| {
2126                                plan_datafusion_err!("Can't parse placeholder: {name}")
2127                            })? - 1;
2128                        let field = fields.get(idx).ok_or_else(|| {
2129                            plan_datafusion_err!(
2130                                "Placeholder ${} refers to a non existent column",
2131                                idx + 1
2132                            )
2133                        })?;
2134                        let _ = prepare_param_data_types.insert(name, Arc::clone(field));
2135                    }
2136                }
2137            }
2138        }
2139        let prepare_param_data_types = prepare_param_data_types.into_values().collect();
2140
2141        // Projection
2142        let mut planner_context =
2143            PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
2144        planner_context.set_table_schema(Some(DFSchemaRef::new(
2145            DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
2146        )));
2147        let source = self.query_to_plan(*source, &mut planner_context)?;
2148        if fields.len() != source.schema().fields().len() {
2149            plan_err!("Column count doesn't match insert query!")?;
2150        }
2151
2152        let exprs = value_indices
2153            .into_iter()
2154            .enumerate()
2155            .map(|(i, value_index)| {
2156                let target_field = table_schema.field(i);
2157                let expr = match value_index {
2158                    Some(v) => {
2159                        Expr::Column(Column::from(source.schema().qualified_field(v)))
2160                            .cast_to(target_field.data_type(), source.schema())?
2161                    }
2162                    // The value is not specified. Fill in the default value for the column.
2163                    None => table_source
2164                        .get_column_default(target_field.name())
2165                        .cloned()
2166                        .unwrap_or_else(|| {
2167                            // If there is no default for the column, then the default is NULL
2168                            Expr::Literal(ScalarValue::Null, None)
2169                        })
2170                        .cast_to(target_field.data_type(), &DFSchema::empty())?,
2171                };
2172                Ok(expr.alias(target_field.name()))
2173            })
2174            .collect::<Result<Vec<Expr>>>()?;
2175        let source = project(source, exprs)?;
2176
2177        let insert_op = match (overwrite, replace_into) {
2178            (false, false) => InsertOp::Append,
2179            (true, false) => InsertOp::Overwrite,
2180            (false, true) => InsertOp::Replace,
2181            (true, true) => plan_err!("Conflicting insert operations: `overwrite` and `replace_into` cannot both be true")?,
2182        };
2183
2184        let plan = LogicalPlan::Dml(DmlStatement::new(
2185            table_name,
2186            Arc::clone(&table_source),
2187            WriteOp::Insert(insert_op),
2188            Arc::new(source),
2189        ));
2190        Ok(plan)
2191    }
2192
2193    fn show_columns_to_plan(
2194        &self,
2195        extended: bool,
2196        full: bool,
2197        sql_table_name: ObjectName,
2198    ) -> Result<LogicalPlan> {
2199        // Figure out the where clause
2200        let where_clause = object_name_to_qualifier(
2201            &sql_table_name,
2202            self.options.enable_ident_normalization,
2203        )?;
2204
2205        if !self.has_table("information_schema", "columns") {
2206            return plan_err!(
2207                "SHOW COLUMNS is not supported unless information_schema is enabled"
2208            );
2209        }
2210
2211        // Do a table lookup to verify the table exists
2212        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2213        let _ = self.context_provider.get_table_source(table_ref)?;
2214
2215        // Treat both FULL and EXTENDED as the same
2216        let select_list = if full || extended {
2217            "*"
2218        } else {
2219            "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2220        };
2221
2222        let query = format!(
2223            "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2224        );
2225
2226        let mut rewrite = DFParser::parse_sql(&query)?;
2227        assert_eq!(rewrite.len(), 1);
2228        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2229    }
2230
2231    /// Rewrite `SHOW FUNCTIONS` to another SQL query
2232    /// The query is based on the `information_schema.routines` and `information_schema.parameters` tables
2233    ///
2234    /// The output columns:
2235    /// - function_name: The name of function
2236    /// - return_type: The return type of the function
2237    /// - parameters: The name of parameters (ordered by the ordinal position)
2238    /// - parameter_types: The type of parameters (ordered by the ordinal position)
2239    /// - description: The description of the function (the description defined in the document)
2240    /// - syntax_example: The syntax_example of the function (the syntax_example defined in the document)
2241    fn show_functions_to_plan(
2242        &self,
2243        filter: Option<ShowStatementFilter>,
2244    ) -> Result<LogicalPlan> {
2245        let where_clause = if let Some(filter) = filter {
2246            match filter {
2247                ShowStatementFilter::Like(like) => {
2248                    format!("WHERE p.function_name like '{like}'")
2249                }
2250                _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2251            }
2252        } else {
2253            "".to_string()
2254        };
2255
2256        let query = format!(
2257            r#"
2258SELECT DISTINCT
2259    p.*,
2260    r.function_type function_type,
2261    r.description description,
2262    r.syntax_example syntax_example
2263FROM
2264    (
2265        SELECT
2266            i.specific_name function_name,
2267            o.data_type return_type,
2268            array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2269            array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2270        FROM (
2271                 SELECT
2272                     specific_catalog,
2273                     specific_schema,
2274                     specific_name,
2275                     ordinal_position,
2276                     parameter_name,
2277                     data_type,
2278                     rid
2279                 FROM
2280                     information_schema.parameters
2281                 WHERE
2282                     parameter_mode = 'IN'
2283             ) i
2284                 JOIN
2285             (
2286                 SELECT
2287                     specific_catalog,
2288                     specific_schema,
2289                     specific_name,
2290                     ordinal_position,
2291                     parameter_name,
2292                     data_type,
2293                     rid
2294                 FROM
2295                     information_schema.parameters
2296                 WHERE
2297                     parameter_mode = 'OUT'
2298             ) o
2299             ON i.specific_catalog = o.specific_catalog
2300                 AND i.specific_schema = o.specific_schema
2301                 AND i.specific_name = o.specific_name
2302                 AND i.rid = o.rid
2303        GROUP BY 1, 2, i.rid
2304    ) as p
2305JOIN information_schema.routines r
2306ON p.function_name = r.routine_name
2307{where_clause}
2308            "#
2309        );
2310        let mut rewrite = DFParser::parse_sql(&query)?;
2311        assert_eq!(rewrite.len(), 1);
2312        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2313    }
2314
2315    fn show_create_table_to_plan(
2316        &self,
2317        sql_table_name: ObjectName,
2318    ) -> Result<LogicalPlan> {
2319        if !self.has_table("information_schema", "tables") {
2320            return plan_err!(
2321                "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2322            );
2323        }
2324        // Figure out the where clause
2325        let where_clause = object_name_to_qualifier(
2326            &sql_table_name,
2327            self.options.enable_ident_normalization,
2328        )?;
2329
2330        // Do a table lookup to verify the table exists
2331        let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2332        let _ = self.context_provider.get_table_source(table_ref)?;
2333
2334        let query = format!(
2335            "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2336        );
2337
2338        let mut rewrite = DFParser::parse_sql(&query)?;
2339        assert_eq!(rewrite.len(), 1);
2340        self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1
2341    }
2342
2343    /// Return true if there is a table provider available for "schema.table"
2344    fn has_table(&self, schema: &str, table: &str) -> bool {
2345        let tables_reference = TableReference::Partial {
2346            schema: schema.into(),
2347            table: table.into(),
2348        };
2349        self.context_provider
2350            .get_table_source(tables_reference)
2351            .is_ok()
2352    }
2353
2354    fn validate_transaction_kind(
2355        &self,
2356        kind: Option<BeginTransactionKind>,
2357    ) -> Result<()> {
2358        match kind {
2359            // BEGIN
2360            None => Ok(()),
2361            // BEGIN TRANSACTION
2362            Some(BeginTransactionKind::Transaction) => Ok(()),
2363            Some(BeginTransactionKind::Work) => {
2364                not_impl_err!("Transaction kind not supported: {kind:?}")
2365            }
2366        }
2367    }
2368}