1use std::collections::{BTreeMap, HashMap, HashSet};
19use std::path::Path;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use crate::parser::{
24 CopyToSource, CopyToStatement, CreateExternalTable, DFParser, ExplainStatement,
25 LexOrdering, Statement as DFStatement,
26};
27use crate::planner::{
28 object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
29};
30use crate::utils::normalize_ident;
31
32use arrow::datatypes::{Field, FieldRef, Fields};
33use datafusion_common::error::_plan_err;
34use datafusion_common::parsers::CompressionTypeVariant;
35use datafusion_common::{
36 exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err,
37 unqualified_field_not_found, Column, Constraint, Constraints, DFSchema, DFSchemaRef,
38 DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, TableReference,
39 ToDFSchema,
40};
41use datafusion_expr::dml::{CopyTo, InsertOp};
42use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
43use datafusion_expr::logical_plan::builder::project;
44use datafusion_expr::logical_plan::DdlStatement;
45use datafusion_expr::utils::expr_to_columns;
46use datafusion_expr::{
47 cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
48 CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
49 CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate,
50 DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
51 EmptyRelation, Execute, Explain, ExplainFormat, Expr, ExprSchemable, Filter,
52 LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable,
53 SortExpr, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
54 TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
55 Volatility, WriteOp,
56};
57use sqlparser::ast::{
58 self, BeginTransactionKind, IndexColumn, IndexType, NullsDistinctOption, OrderByExpr,
59 OrderByOptions, Set, ShowStatementIn, ShowStatementOptions, SqliteOnConflict,
60 TableObject, UpdateTableFromKind, ValueWithSpan,
61};
62use sqlparser::ast::{
63 Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
64 CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
65 ObjectName, ObjectType, Query, SchemaName, SetExpr, ShowCreateObject,
66 ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins,
67 TransactionMode, UnaryOperator, Value,
68};
69use sqlparser::parser::ParserError::ParserError;
70
71fn ident_to_string(ident: &Ident) -> String {
72 normalize_ident(ident.to_owned())
73}
74
75fn object_name_to_string(object_name: &ObjectName) -> String {
76 object_name
77 .0
78 .iter()
79 .map(|object_name_part| {
80 object_name_part
81 .as_ident()
82 .map_or_else(String::new, ident_to_string)
85 })
86 .collect::<Vec<String>>()
87 .join(".")
88}
89
90fn get_schema_name(schema_name: &SchemaName) -> String {
91 match schema_name {
92 SchemaName::Simple(schema_name) => object_name_to_string(schema_name),
93 SchemaName::UnnamedAuthorization(auth) => ident_to_string(auth),
94 SchemaName::NamedAuthorization(schema_name, auth) => format!(
95 "{}.{}",
96 object_name_to_string(schema_name),
97 ident_to_string(auth)
98 ),
99 }
100}
101
102fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec<TableConstraint> {
105 let mut constraints = vec![];
106 for column in columns {
107 for ast::ColumnOptionDef { name, option } in &column.options {
108 match option {
109 ast::ColumnOption::Unique {
110 is_primary: false,
111 characteristics,
112 } => constraints.push(TableConstraint::Unique {
113 name: name.clone(),
114 columns: vec![IndexColumn {
115 column: OrderByExpr {
116 expr: SQLExpr::Identifier(column.name.clone()),
117 options: OrderByOptions {
118 asc: None,
119 nulls_first: None,
120 },
121 with_fill: None,
122 },
123 operator_class: None,
124 }],
125 characteristics: *characteristics,
126 index_name: None,
127 index_type_display: ast::KeyOrIndexDisplay::None,
128 index_type: None,
129 index_options: vec![],
130 nulls_distinct: NullsDistinctOption::None,
131 }),
132 ast::ColumnOption::Unique {
133 is_primary: true,
134 characteristics,
135 } => constraints.push(TableConstraint::PrimaryKey {
136 name: name.clone(),
137 columns: vec![IndexColumn {
138 column: OrderByExpr {
139 expr: SQLExpr::Identifier(column.name.clone()),
140 options: OrderByOptions {
141 asc: None,
142 nulls_first: None,
143 },
144 with_fill: None,
145 },
146 operator_class: None,
147 }],
148 characteristics: *characteristics,
149 index_name: None,
150 index_type: None,
151 index_options: vec![],
152 }),
153 ast::ColumnOption::ForeignKey {
154 foreign_table,
155 referred_columns,
156 on_delete,
157 on_update,
158 characteristics,
159 } => constraints.push(TableConstraint::ForeignKey {
160 name: name.clone(),
161 columns: vec![],
162 foreign_table: foreign_table.clone(),
163 referred_columns: referred_columns.to_vec(),
164 on_delete: *on_delete,
165 on_update: *on_update,
166 characteristics: *characteristics,
167 index_name: None,
168 }),
169 ast::ColumnOption::Check(expr) => {
170 constraints.push(TableConstraint::Check {
171 name: name.clone(),
172 expr: Box::new(expr.clone()),
173 enforced: None,
174 })
175 }
176 ast::ColumnOption::Default(_)
178 | ast::ColumnOption::Null
179 | ast::ColumnOption::NotNull
180 | ast::ColumnOption::DialectSpecific(_)
181 | ast::ColumnOption::CharacterSet(_)
182 | ast::ColumnOption::Generated { .. }
183 | ast::ColumnOption::Comment(_)
184 | ast::ColumnOption::Options(_)
185 | ast::ColumnOption::OnUpdate(_)
186 | ast::ColumnOption::Materialized(_)
187 | ast::ColumnOption::Ephemeral(_)
188 | ast::ColumnOption::Identity(_)
189 | ast::ColumnOption::OnConflict(_)
190 | ast::ColumnOption::Policy(_)
191 | ast::ColumnOption::Tags(_)
192 | ast::ColumnOption::Alias(_)
193 | ast::ColumnOption::Srid(_)
194 | ast::ColumnOption::Collation(_) => {}
195 }
196 }
197 }
198 constraints
199}
200
201impl<S: ContextProvider> SqlToRel<'_, S> {
202 pub fn statement_to_plan(&self, statement: DFStatement) -> Result<LogicalPlan> {
204 match statement {
205 DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s),
206 DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
207 DFStatement::CopyTo(s) => self.copy_to_plan(s),
208 DFStatement::Explain(ExplainStatement {
209 verbose,
210 analyze,
211 format,
212 statement,
213 }) => self.explain_to_plan(verbose, analyze, format, *statement),
214 }
215 }
216
217 pub fn sql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
219 self.sql_statement_to_plan_with_context_impl(
220 statement,
221 &mut PlannerContext::new(),
222 )
223 }
224
225 pub fn sql_statement_to_plan_with_context(
227 &self,
228 statement: Statement,
229 planner_context: &mut PlannerContext,
230 ) -> Result<LogicalPlan> {
231 self.sql_statement_to_plan_with_context_impl(statement, planner_context)
232 }
233
234 fn sql_statement_to_plan_with_context_impl(
235 &self,
236 statement: Statement,
237 planner_context: &mut PlannerContext,
238 ) -> Result<LogicalPlan> {
239 match statement {
240 Statement::ExplainTable {
241 describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, table_name,
243 ..
244 } => self.describe_table_to_plan(table_name),
245 Statement::Explain {
246 describe_alias: DescribeAlias::Describe | DescribeAlias::Desc, statement,
248 ..
249 } => match *statement {
250 Statement::Query(query) => self.describe_query_to_plan(*query),
251 _ => {
252 not_impl_err!("Describing statements other than SELECT not supported")
253 }
254 },
255 Statement::Explain {
256 verbose,
257 statement,
258 analyze,
259 format,
260 describe_alias: _,
261 ..
262 } => {
263 let format = format.map(|format| format.to_string());
264 let statement = DFStatement::Statement(statement);
265 self.explain_to_plan(verbose, analyze, format, statement)
266 }
267 Statement::Query(query) => self.query_to_plan(*query, planner_context),
268 Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
269 Statement::Set(statement) => self.set_statement_to_plan(statement),
270 Statement::CreateTable(CreateTable {
271 temporary,
272 external,
273 global,
274 transient,
275 volatile,
276 hive_distribution,
277 hive_formats,
278 file_format,
279 location,
280 query,
281 name,
282 columns,
283 constraints,
284 if_not_exists,
285 or_replace,
286 without_rowid,
287 like,
288 clone,
289 comment,
290 on_commit,
291 on_cluster,
292 primary_key,
293 order_by,
294 partition_by,
295 cluster_by,
296 clustered_by,
297 strict,
298 copy_grants,
299 enable_schema_evolution,
300 change_tracking,
301 data_retention_time_in_days,
302 max_data_extension_time_in_days,
303 default_ddl_collation,
304 with_aggregation_policy,
305 with_row_access_policy,
306 with_tags,
307 iceberg,
308 external_volume,
309 base_location,
310 catalog,
311 catalog_sync,
312 storage_serialization_policy,
313 inherits,
314 table_options: CreateTableOptions::None,
315 dynamic,
316 version,
317 target_lag,
318 warehouse,
319 refresh_mode,
320 initialize,
321 require_user,
322 }) => {
323 if temporary {
324 return not_impl_err!("Temporary tables not supported")?;
325 }
326 if external {
327 return not_impl_err!("External tables not supported")?;
328 }
329 if global.is_some() {
330 return not_impl_err!("Global tables not supported")?;
331 }
332 if transient {
333 return not_impl_err!("Transient tables not supported")?;
334 }
335 if volatile {
336 return not_impl_err!("Volatile tables not supported")?;
337 }
338 if hive_distribution != ast::HiveDistributionStyle::NONE {
339 return not_impl_err!(
340 "Hive distribution not supported: {hive_distribution:?}"
341 )?;
342 }
343 if !matches!(
344 hive_formats,
345 Some(ast::HiveFormat {
346 row_format: None,
347 serde_properties: None,
348 storage: None,
349 location: None,
350 })
351 ) {
352 return not_impl_err!(
353 "Hive formats not supported: {hive_formats:?}"
354 )?;
355 }
356 if file_format.is_some() {
357 return not_impl_err!("File format not supported")?;
358 }
359 if location.is_some() {
360 return not_impl_err!("Location not supported")?;
361 }
362 if without_rowid {
363 return not_impl_err!("Without rowid not supported")?;
364 }
365 if like.is_some() {
366 return not_impl_err!("Like not supported")?;
367 }
368 if clone.is_some() {
369 return not_impl_err!("Clone not supported")?;
370 }
371 if comment.is_some() {
372 return not_impl_err!("Comment not supported")?;
373 }
374 if on_commit.is_some() {
375 return not_impl_err!("On commit not supported")?;
376 }
377 if on_cluster.is_some() {
378 return not_impl_err!("On cluster not supported")?;
379 }
380 if primary_key.is_some() {
381 return not_impl_err!("Primary key not supported")?;
382 }
383 if order_by.is_some() {
384 return not_impl_err!("Order by not supported")?;
385 }
386 if partition_by.is_some() {
387 return not_impl_err!("Partition by not supported")?;
388 }
389 if cluster_by.is_some() {
390 return not_impl_err!("Cluster by not supported")?;
391 }
392 if clustered_by.is_some() {
393 return not_impl_err!("Clustered by not supported")?;
394 }
395 if strict {
396 return not_impl_err!("Strict not supported")?;
397 }
398 if copy_grants {
399 return not_impl_err!("Copy grants not supported")?;
400 }
401 if enable_schema_evolution.is_some() {
402 return not_impl_err!("Enable schema evolution not supported")?;
403 }
404 if change_tracking.is_some() {
405 return not_impl_err!("Change tracking not supported")?;
406 }
407 if data_retention_time_in_days.is_some() {
408 return not_impl_err!("Data retention time in days not supported")?;
409 }
410 if max_data_extension_time_in_days.is_some() {
411 return not_impl_err!(
412 "Max data extension time in days not supported"
413 )?;
414 }
415 if default_ddl_collation.is_some() {
416 return not_impl_err!("Default DDL collation not supported")?;
417 }
418 if with_aggregation_policy.is_some() {
419 return not_impl_err!("With aggregation policy not supported")?;
420 }
421 if with_row_access_policy.is_some() {
422 return not_impl_err!("With row access policy not supported")?;
423 }
424 if with_tags.is_some() {
425 return not_impl_err!("With tags not supported")?;
426 }
427 if iceberg {
428 return not_impl_err!("Iceberg not supported")?;
429 }
430 if external_volume.is_some() {
431 return not_impl_err!("External volume not supported")?;
432 }
433 if base_location.is_some() {
434 return not_impl_err!("Base location not supported")?;
435 }
436 if catalog.is_some() {
437 return not_impl_err!("Catalog not supported")?;
438 }
439 if catalog_sync.is_some() {
440 return not_impl_err!("Catalog sync not supported")?;
441 }
442 if storage_serialization_policy.is_some() {
443 return not_impl_err!("Storage serialization policy not supported")?;
444 }
445 if inherits.is_some() {
446 return not_impl_err!("Table inheritance not supported")?;
447 }
448 if dynamic {
449 return not_impl_err!("Dynamic tables not supported")?;
450 }
451 if version.is_some() {
452 return not_impl_err!("Version not supported")?;
453 }
454 if target_lag.is_some() {
455 return not_impl_err!("Target lag not supported")?;
456 }
457 if warehouse.is_some() {
458 return not_impl_err!("Warehouse not supported")?;
459 }
460 if refresh_mode.is_some() {
461 return not_impl_err!("Refresh mode not supported")?;
462 }
463 if initialize.is_some() {
464 return not_impl_err!("Initialize not supported")?;
465 }
466 if require_user {
467 return not_impl_err!("Require user not supported")?;
468 }
469 let mut all_constraints = constraints;
471 let inline_constraints = calc_inline_constraints_from_columns(&columns);
472 all_constraints.extend(inline_constraints);
473 let column_defaults =
475 self.build_column_defaults(&columns, planner_context)?;
476
477 let has_columns = !columns.is_empty();
478 let schema = self.build_schema(columns)?.to_dfschema_ref()?;
479 if has_columns {
480 planner_context.set_table_schema(Some(Arc::clone(&schema)));
481 }
482
483 match query {
484 Some(query) => {
485 let plan = self.query_to_plan(*query, planner_context)?;
486 let input_schema = plan.schema();
487
488 let plan = if has_columns {
489 if schema.fields().len() != input_schema.fields().len() {
490 return plan_err!(
491 "Mismatch: {} columns specified, but result has {} columns",
492 schema.fields().len(),
493 input_schema.fields().len()
494 );
495 }
496 let input_fields = input_schema.fields();
497 let project_exprs = schema
498 .fields()
499 .iter()
500 .zip(input_fields)
501 .map(|(field, input_field)| {
502 cast(
503 col(input_field.name()),
504 field.data_type().clone(),
505 )
506 .alias(field.name())
507 })
508 .collect::<Vec<_>>();
509
510 LogicalPlanBuilder::from(plan.clone())
511 .project(project_exprs)?
512 .build()?
513 } else {
514 plan
515 };
516
517 let constraints = self.new_constraint_from_table_constraints(
518 &all_constraints,
519 plan.schema(),
520 )?;
521
522 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
523 CreateMemoryTable {
524 name: self.object_name_to_table_reference(name)?,
525 constraints,
526 input: Arc::new(plan),
527 if_not_exists,
528 or_replace,
529 column_defaults,
530 temporary,
531 },
532 )))
533 }
534
535 None => {
536 let plan = EmptyRelation {
537 produce_one_row: false,
538 schema,
539 };
540 let plan = LogicalPlan::EmptyRelation(plan);
541 let constraints = self.new_constraint_from_table_constraints(
542 &all_constraints,
543 plan.schema(),
544 )?;
545 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
546 CreateMemoryTable {
547 name: self.object_name_to_table_reference(name)?,
548 constraints,
549 input: Arc::new(plan),
550 if_not_exists,
551 or_replace,
552 column_defaults,
553 temporary,
554 },
555 )))
556 }
557 }
558 }
559 Statement::CreateView {
560 or_replace,
561 materialized,
562 name,
563 columns,
564 query,
565 options: CreateTableOptions::None,
566 cluster_by,
567 comment,
568 with_no_schema_binding,
569 if_not_exists,
570 temporary,
571 to,
572 params,
573 or_alter,
574 secure,
575 name_before_not_exists,
576 } => {
577 if materialized {
578 return not_impl_err!("Materialized views not supported")?;
579 }
580 if !cluster_by.is_empty() {
581 return not_impl_err!("Cluster by not supported")?;
582 }
583 if comment.is_some() {
584 return not_impl_err!("Comment not supported")?;
585 }
586 if with_no_schema_binding {
587 return not_impl_err!("With no schema binding not supported")?;
588 }
589 if if_not_exists {
590 return not_impl_err!("If not exists not supported")?;
591 }
592 if to.is_some() {
593 return not_impl_err!("To not supported")?;
594 }
595
596 let stmt = Statement::CreateView {
599 or_replace,
600 materialized,
601 name,
602 columns,
603 query,
604 options: CreateTableOptions::None,
605 cluster_by,
606 comment,
607 with_no_schema_binding,
608 if_not_exists,
609 temporary,
610 to,
611 params,
612 or_alter,
613 secure,
614 name_before_not_exists,
615 };
616 let sql = stmt.to_string();
617 let Statement::CreateView {
618 name,
619 columns,
620 query,
621 or_replace,
622 temporary,
623 ..
624 } = stmt
625 else {
626 return internal_err!("Unreachable code in create view");
627 };
628
629 let columns = columns
630 .into_iter()
631 .map(|view_column_def| {
632 if let Some(options) = view_column_def.options {
633 plan_err!(
634 "Options not supported for view columns: {options:?}"
635 )
636 } else {
637 Ok(view_column_def.name)
638 }
639 })
640 .collect::<Result<Vec<_>>>()?;
641
642 let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
643 plan = self.apply_expr_alias(plan, columns)?;
644
645 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
646 name: self.object_name_to_table_reference(name)?,
647 input: Arc::new(plan),
648 or_replace,
649 definition: Some(sql),
650 temporary,
651 })))
652 }
653 Statement::ShowCreate { obj_type, obj_name } => match obj_type {
654 ShowCreateObject::Table => self.show_create_table_to_plan(obj_name),
655 _ => {
656 not_impl_err!("Only `SHOW CREATE TABLE ...` statement is supported")
657 }
658 },
659 Statement::CreateSchema {
660 schema_name,
661 if_not_exists,
662 ..
663 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
664 CreateCatalogSchema {
665 schema_name: get_schema_name(&schema_name),
666 if_not_exists,
667 schema: Arc::new(DFSchema::empty()),
668 },
669 ))),
670 Statement::CreateDatabase {
671 db_name,
672 if_not_exists,
673 ..
674 } => Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
675 CreateCatalog {
676 catalog_name: object_name_to_string(&db_name),
677 if_not_exists,
678 schema: Arc::new(DFSchema::empty()),
679 },
680 ))),
681 Statement::Drop {
682 object_type,
683 if_exists,
684 mut names,
685 cascade,
686 restrict: _,
687 purge: _,
688 temporary: _,
689 table: _,
690 } => {
691 let name = match names.len() {
694 0 => Err(ParserError("Missing table name.".to_string()).into()),
695 1 => self.object_name_to_table_reference(names.pop().unwrap()),
696 _ => {
697 Err(ParserError("Multiple objects not supported".to_string())
698 .into())
699 }
700 }?;
701
702 match object_type {
703 ObjectType::Table => {
704 Ok(LogicalPlan::Ddl(DdlStatement::DropTable(DropTable {
705 name,
706 if_exists,
707 schema: DFSchemaRef::new(DFSchema::empty()),
708 })))
709 }
710 ObjectType::View => {
711 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
712 name,
713 if_exists,
714 schema: DFSchemaRef::new(DFSchema::empty()),
715 })))
716 }
717 ObjectType::Schema => {
718 let name = match name {
719 TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table }),
720 TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table, catalog: schema }),
721 TableReference::Full { catalog: _, schema: _, table: _ } => {
722 Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
723 }
724 }?;
725 Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
726 name,
727 if_exists,
728 cascade,
729 schema: DFSchemaRef::new(DFSchema::empty()),
730 })))
731 }
732 _ => not_impl_err!(
733 "Only `DROP TABLE/VIEW/SCHEMA ...` statement is supported currently"
734 ),
735 }
736 }
737 Statement::Prepare {
738 name,
739 data_types,
740 statement,
741 } => {
742 let mut fields: Vec<FieldRef> = data_types
744 .into_iter()
745 .map(|t| self.convert_data_type_to_field(&t))
746 .collect::<Result<_>>()?;
747
748 let mut planner_context =
750 PlannerContext::new().with_prepare_param_data_types(fields.clone());
751
752 let plan = self.sql_statement_to_plan_with_context_impl(
754 *statement,
755 &mut planner_context,
756 )?;
757
758 if fields.is_empty() {
759 let map_types = plan.get_parameter_fields()?;
760 let param_types: Vec<_> = (1..=map_types.len())
761 .filter_map(|i| {
762 let key = format!("${i}");
763 map_types.get(&key).and_then(|opt| opt.clone())
764 })
765 .collect();
766 fields.extend(param_types.iter().cloned());
767 planner_context.with_prepare_param_data_types(param_types);
768 }
769
770 Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare {
771 name: ident_to_string(&name),
772 fields,
773 input: Arc::new(plan),
774 })))
775 }
776 Statement::Execute {
777 name,
778 parameters,
779 using,
780 has_parentheses: _,
783 immediate,
784 into,
785 output,
786 default,
787 } => {
788 if !using.is_empty() {
790 return not_impl_err!(
791 "Execute statement with USING is not supported"
792 );
793 }
794 if immediate {
795 return not_impl_err!(
796 "Execute statement with IMMEDIATE is not supported"
797 );
798 }
799 if !into.is_empty() {
800 return not_impl_err!("Execute statement with INTO is not supported");
801 }
802 if output {
803 return not_impl_err!(
804 "Execute statement with OUTPUT is not supported"
805 );
806 }
807 if default {
808 return not_impl_err!(
809 "Execute statement with DEFAULT is not supported"
810 );
811 }
812 let empty_schema = DFSchema::empty();
813 let parameters = parameters
814 .into_iter()
815 .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
816 .collect::<Result<Vec<Expr>>>()?;
817
818 Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
819 name: object_name_to_string(&name.unwrap()),
820 parameters,
821 })))
822 }
823 Statement::Deallocate {
824 name,
825 prepare: _,
827 } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate(
828 Deallocate {
829 name: ident_to_string(&name),
830 },
831 ))),
832
833 Statement::ShowTables {
834 extended,
835 full,
836 terse,
837 history,
838 external,
839 show_options,
840 } => {
841 if extended {
844 return not_impl_err!("SHOW TABLES EXTENDED not supported")?;
845 }
846 if full {
847 return not_impl_err!("SHOW FULL TABLES not supported")?;
848 }
849 if terse {
850 return not_impl_err!("SHOW TERSE TABLES not supported")?;
851 }
852 if history {
853 return not_impl_err!("SHOW TABLES HISTORY not supported")?;
854 }
855 if external {
856 return not_impl_err!("SHOW EXTERNAL TABLES not supported")?;
857 }
858 let ShowStatementOptions {
859 show_in,
860 starts_with,
861 limit,
862 limit_from,
863 filter_position,
864 } = show_options;
865 if show_in.is_some() {
866 return not_impl_err!("SHOW TABLES IN not supported")?;
867 }
868 if starts_with.is_some() {
869 return not_impl_err!("SHOW TABLES LIKE not supported")?;
870 }
871 if limit.is_some() {
872 return not_impl_err!("SHOW TABLES LIMIT not supported")?;
873 }
874 if limit_from.is_some() {
875 return not_impl_err!("SHOW TABLES LIMIT FROM not supported")?;
876 }
877 if filter_position.is_some() {
878 return not_impl_err!("SHOW TABLES FILTER not supported")?;
879 }
880 self.show_tables_to_plan()
881 }
882
883 Statement::ShowColumns {
884 extended,
885 full,
886 show_options,
887 } => {
888 let ShowStatementOptions {
889 show_in,
890 starts_with,
891 limit,
892 limit_from,
893 filter_position,
894 } = show_options;
895 if starts_with.is_some() {
896 return not_impl_err!("SHOW COLUMNS LIKE not supported")?;
897 }
898 if limit.is_some() {
899 return not_impl_err!("SHOW COLUMNS LIMIT not supported")?;
900 }
901 if limit_from.is_some() {
902 return not_impl_err!("SHOW COLUMNS LIMIT FROM not supported")?;
903 }
904 if filter_position.is_some() {
905 return not_impl_err!(
906 "SHOW COLUMNS with WHERE or LIKE is not supported"
907 )?;
908 }
909 let Some(ShowStatementIn {
910 clause: _,
913 parent_type,
914 parent_name,
915 }) = show_in
916 else {
917 return plan_err!("SHOW COLUMNS requires a table name");
918 };
919
920 if let Some(parent_type) = parent_type {
921 return not_impl_err!("SHOW COLUMNS IN {parent_type} not supported");
922 }
923 let Some(table_name) = parent_name else {
924 return plan_err!("SHOW COLUMNS requires a table name");
925 };
926
927 self.show_columns_to_plan(extended, full, table_name)
928 }
929
930 Statement::ShowFunctions { filter, .. } => {
931 self.show_functions_to_plan(filter)
932 }
933
934 Statement::Insert(Insert {
935 or,
936 into,
937 columns,
938 overwrite,
939 source,
940 partitioned,
941 after_columns,
942 table,
943 on,
944 returning,
945 ignore,
946 table_alias,
947 mut replace_into,
948 priority,
949 insert_alias,
950 assignments,
951 has_table_keyword,
952 settings,
953 format_clause,
954 }) => {
955 let table_name = match table {
956 TableObject::TableName(table_name) => table_name,
957 TableObject::TableFunction(_) => {
958 return not_impl_err!("INSERT INTO Table functions not supported")
959 }
960 };
961 if let Some(or) = or {
962 match or {
963 SqliteOnConflict::Replace => replace_into = true,
964 _ => plan_err!("Inserts with {or} clause is not supported")?,
965 }
966 }
967 if partitioned.is_some() {
968 plan_err!("Partitioned inserts not yet supported")?;
969 }
970 if !after_columns.is_empty() {
971 plan_err!("After-columns clause not supported")?;
972 }
973 if on.is_some() {
974 plan_err!("Insert-on clause not supported")?;
975 }
976 if returning.is_some() {
977 plan_err!("Insert-returning clause not supported")?;
978 }
979 if ignore {
980 plan_err!("Insert-ignore clause not supported")?;
981 }
982 let Some(source) = source else {
983 plan_err!("Inserts without a source not supported")?
984 };
985 if let Some(table_alias) = table_alias {
986 plan_err!(
987 "Inserts with a table alias not supported: {table_alias:?}"
988 )?
989 };
990 if let Some(priority) = priority {
991 plan_err!(
992 "Inserts with a `PRIORITY` clause not supported: {priority:?}"
993 )?
994 };
995 if insert_alias.is_some() {
996 plan_err!("Inserts with an alias not supported")?;
997 }
998 if !assignments.is_empty() {
999 plan_err!("Inserts with assignments not supported")?;
1000 }
1001 if settings.is_some() {
1002 plan_err!("Inserts with settings not supported")?;
1003 }
1004 if format_clause.is_some() {
1005 plan_err!("Inserts with format clause not supported")?;
1006 }
1007 let _ = into;
1009 let _ = has_table_keyword;
1010 self.insert_to_plan(table_name, columns, source, overwrite, replace_into)
1011 }
1012 Statement::Update {
1013 table,
1014 assignments,
1015 from,
1016 selection,
1017 returning,
1018 or,
1019 limit,
1020 } => {
1021 let from_clauses =
1022 from.map(|update_table_from_kind| match update_table_from_kind {
1023 UpdateTableFromKind::BeforeSet(from_clauses) => from_clauses,
1024 UpdateTableFromKind::AfterSet(from_clauses) => from_clauses,
1025 });
1026 if from_clauses.as_ref().is_some_and(|f| f.len() > 1) {
1028 plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?;
1029 }
1030 let update_from = from_clauses.and_then(|mut f| f.pop());
1031 if returning.is_some() {
1032 plan_err!("Update-returning clause not yet supported")?;
1033 }
1034 if or.is_some() {
1035 plan_err!("ON conflict not supported")?;
1036 }
1037 if limit.is_some() {
1038 return not_impl_err!("Update-limit clause not supported")?;
1039 }
1040 self.update_to_plan(table, assignments, update_from, selection)
1041 }
1042
1043 Statement::Delete(Delete {
1044 tables,
1045 using,
1046 selection,
1047 returning,
1048 from,
1049 order_by,
1050 limit,
1051 }) => {
1052 if !tables.is_empty() {
1053 plan_err!("DELETE <TABLE> not supported")?;
1054 }
1055
1056 if using.is_some() {
1057 plan_err!("Using clause not supported")?;
1058 }
1059
1060 if returning.is_some() {
1061 plan_err!("Delete-returning clause not yet supported")?;
1062 }
1063
1064 if !order_by.is_empty() {
1065 plan_err!("Delete-order-by clause not yet supported")?;
1066 }
1067
1068 if limit.is_some() {
1069 plan_err!("Delete-limit clause not yet supported")?;
1070 }
1071
1072 let table_name = self.get_delete_target(from)?;
1073 self.delete_to_plan(table_name, selection)
1074 }
1075
1076 Statement::StartTransaction {
1077 modes,
1078 begin: false,
1079 modifier,
1080 transaction,
1081 statements,
1082 has_end_keyword,
1083 exception,
1084 } => {
1085 if let Some(modifier) = modifier {
1086 return not_impl_err!(
1087 "Transaction modifier not supported: {modifier}"
1088 );
1089 }
1090 if !statements.is_empty() {
1091 return not_impl_err!(
1092 "Transaction with multiple statements not supported"
1093 );
1094 }
1095 if exception.is_some() {
1096 return not_impl_err!(
1097 "Transaction with exception statements not supported"
1098 );
1099 }
1100 if has_end_keyword {
1101 return not_impl_err!("Transaction with END keyword not supported");
1102 }
1103 self.validate_transaction_kind(transaction)?;
1104 let isolation_level: ast::TransactionIsolationLevel = modes
1105 .iter()
1106 .filter_map(|m: &TransactionMode| match m {
1107 TransactionMode::AccessMode(_) => None,
1108 TransactionMode::IsolationLevel(level) => Some(level),
1109 })
1110 .next_back()
1111 .copied()
1112 .unwrap_or(ast::TransactionIsolationLevel::Serializable);
1113 let access_mode: ast::TransactionAccessMode = modes
1114 .iter()
1115 .filter_map(|m: &TransactionMode| match m {
1116 TransactionMode::AccessMode(mode) => Some(mode),
1117 TransactionMode::IsolationLevel(_) => None,
1118 })
1119 .next_back()
1120 .copied()
1121 .unwrap_or(ast::TransactionAccessMode::ReadWrite);
1122 let isolation_level = match isolation_level {
1123 ast::TransactionIsolationLevel::ReadUncommitted => {
1124 TransactionIsolationLevel::ReadUncommitted
1125 }
1126 ast::TransactionIsolationLevel::ReadCommitted => {
1127 TransactionIsolationLevel::ReadCommitted
1128 }
1129 ast::TransactionIsolationLevel::RepeatableRead => {
1130 TransactionIsolationLevel::RepeatableRead
1131 }
1132 ast::TransactionIsolationLevel::Serializable => {
1133 TransactionIsolationLevel::Serializable
1134 }
1135 ast::TransactionIsolationLevel::Snapshot => {
1136 TransactionIsolationLevel::Snapshot
1137 }
1138 };
1139 let access_mode = match access_mode {
1140 ast::TransactionAccessMode::ReadOnly => {
1141 TransactionAccessMode::ReadOnly
1142 }
1143 ast::TransactionAccessMode::ReadWrite => {
1144 TransactionAccessMode::ReadWrite
1145 }
1146 };
1147 let statement = PlanStatement::TransactionStart(TransactionStart {
1148 access_mode,
1149 isolation_level,
1150 });
1151 Ok(LogicalPlan::Statement(statement))
1152 }
1153 Statement::Commit {
1154 chain,
1155 end,
1156 modifier,
1157 } => {
1158 if end {
1159 return not_impl_err!("COMMIT AND END not supported");
1160 };
1161 if let Some(modifier) = modifier {
1162 return not_impl_err!("COMMIT {modifier} not supported");
1163 };
1164 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1165 conclusion: TransactionConclusion::Commit,
1166 chain,
1167 });
1168 Ok(LogicalPlan::Statement(statement))
1169 }
1170 Statement::Rollback { chain, savepoint } => {
1171 if savepoint.is_some() {
1172 plan_err!("Savepoints not supported")?;
1173 }
1174 let statement = PlanStatement::TransactionEnd(TransactionEnd {
1175 conclusion: TransactionConclusion::Rollback,
1176 chain,
1177 });
1178 Ok(LogicalPlan::Statement(statement))
1179 }
1180 Statement::CreateFunction(ast::CreateFunction {
1181 or_replace,
1182 temporary,
1183 name,
1184 args,
1185 return_type,
1186 function_body,
1187 behavior,
1188 language,
1189 ..
1190 }) => {
1191 let return_type = match return_type {
1192 Some(t) => Some(self.convert_data_type_to_field(&t)?),
1193 None => None,
1194 };
1195 let mut planner_context = PlannerContext::new();
1196 let empty_schema = &DFSchema::empty();
1197
1198 let args = match args {
1199 Some(function_args) => {
1200 let function_args = function_args
1201 .into_iter()
1202 .map(|arg| {
1203 let data_type =
1204 self.convert_data_type_to_field(&arg.data_type)?;
1205
1206 let default_expr = match arg.default_expr {
1207 Some(expr) => Some(self.sql_to_expr(
1208 expr,
1209 empty_schema,
1210 &mut planner_context,
1211 )?),
1212 None => None,
1213 };
1214 Ok(OperateFunctionArg {
1215 name: arg.name,
1216 default_expr,
1217 data_type: data_type.data_type().clone(),
1218 })
1219 })
1220 .collect::<Result<Vec<OperateFunctionArg>>>();
1221 Some(function_args?)
1222 }
1223 None => None,
1224 };
1225 let name = match &name.0[..] {
1227 [] => exec_err!("Function should have name")?,
1228 [n] => n.as_ident().unwrap().value.clone(),
1229 [..] => not_impl_err!("Qualified functions are not supported")?,
1230 };
1231 let arg_types = args.as_ref().map(|arg| {
1235 arg.iter()
1236 .map(|t| Arc::new(Field::new("", t.data_type.clone(), true)))
1237 .collect::<Vec<_>>()
1238 });
1239 let mut planner_context = PlannerContext::new()
1240 .with_prepare_param_data_types(arg_types.unwrap_or_default());
1241
1242 let function_body = match function_body {
1243 Some(r) => Some(self.sql_to_expr(
1244 match r {
1245 ast::CreateFunctionBody::AsBeforeOptions(expr) => expr,
1246 ast::CreateFunctionBody::AsAfterOptions(expr) => expr,
1247 ast::CreateFunctionBody::Return(expr) => expr,
1248 ast::CreateFunctionBody::AsBeginEnd(_) => {
1249 return not_impl_err!(
1250 "BEGIN/END enclosed function body syntax is not supported"
1251 )?;
1252 }
1253 ast::CreateFunctionBody::AsReturnExpr(_)
1254 | ast::CreateFunctionBody::AsReturnSelect(_) => {
1255 return not_impl_err!(
1256 "AS RETURN function syntax is not supported"
1257 )?
1258 }
1259 },
1260 &DFSchema::empty(),
1261 &mut planner_context,
1262 )?),
1263 None => None,
1264 };
1265
1266 let params = CreateFunctionBody {
1267 language,
1268 behavior: behavior.map(|b| match b {
1269 ast::FunctionBehavior::Immutable => Volatility::Immutable,
1270 ast::FunctionBehavior::Stable => Volatility::Stable,
1271 ast::FunctionBehavior::Volatile => Volatility::Volatile,
1272 }),
1273 function_body,
1274 };
1275
1276 let statement = DdlStatement::CreateFunction(CreateFunction {
1277 or_replace,
1278 temporary,
1279 name,
1280 return_type: return_type.map(|f| f.data_type().clone()),
1281 args,
1282 params,
1283 schema: DFSchemaRef::new(DFSchema::empty()),
1284 });
1285
1286 Ok(LogicalPlan::Ddl(statement))
1287 }
1288 Statement::DropFunction {
1289 if_exists,
1290 func_desc,
1291 ..
1292 } => {
1293 if let Some(desc) = func_desc.first() {
1296 let name = match &desc.name.0[..] {
1298 [] => exec_err!("Function should have name")?,
1299 [n] => n.as_ident().unwrap().value.clone(),
1300 [..] => not_impl_err!("Qualified functions are not supported")?,
1301 };
1302 let statement = DdlStatement::DropFunction(DropFunction {
1303 if_exists,
1304 name,
1305 schema: DFSchemaRef::new(DFSchema::empty()),
1306 });
1307 Ok(LogicalPlan::Ddl(statement))
1308 } else {
1309 exec_err!("Function name not provided")
1310 }
1311 }
1312 Statement::CreateIndex(CreateIndex {
1313 name,
1314 table_name,
1315 using,
1316 columns,
1317 unique,
1318 if_not_exists,
1319 ..
1320 }) => {
1321 let name: Option<String> = name.as_ref().map(object_name_to_string);
1322 let table = self.object_name_to_table_reference(table_name)?;
1323 let table_schema = self
1324 .context_provider
1325 .get_table_source(table.clone())?
1326 .schema()
1327 .to_dfschema_ref()?;
1328 let using: Option<String> =
1329 using.as_ref().map(|index_type| match index_type {
1330 IndexType::Custom(ident) => ident_to_string(ident),
1331 _ => index_type.to_string().to_ascii_lowercase(),
1332 });
1333 let order_by_exprs: Vec<OrderByExpr> =
1334 columns.into_iter().map(|col| col.column).collect();
1335 let columns = self.order_by_to_sort_expr(
1336 order_by_exprs,
1337 &table_schema,
1338 planner_context,
1339 false,
1340 None,
1341 )?;
1342 Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex(
1343 PlanCreateIndex {
1344 name,
1345 table,
1346 using,
1347 columns,
1348 unique,
1349 if_not_exists,
1350 schema: DFSchemaRef::new(DFSchema::empty()),
1351 },
1352 )))
1353 }
1354 stmt => {
1355 not_impl_err!("Unsupported SQL statement: {stmt}")
1356 }
1357 }
1358 }
1359
1360 fn get_delete_target(&self, from: FromTable) -> Result<ObjectName> {
1361 let mut from = match from {
1362 FromTable::WithFromKeyword(v) => v,
1363 FromTable::WithoutKeyword(v) => v,
1364 };
1365
1366 if from.len() != 1 {
1367 return not_impl_err!(
1368 "DELETE FROM only supports single table, got {}: {from:?}",
1369 from.len()
1370 );
1371 }
1372 let table_factor = from.pop().unwrap();
1373 if !table_factor.joins.is_empty() {
1374 return not_impl_err!("DELETE FROM only supports single table, got: joins");
1375 }
1376 let TableFactor::Table { name, .. } = table_factor.relation else {
1377 return not_impl_err!(
1378 "DELETE FROM only supports single table, got: {table_factor:?}"
1379 );
1380 };
1381
1382 Ok(name)
1383 }
1384
1385 fn show_tables_to_plan(&self) -> Result<LogicalPlan> {
1387 if self.has_table("information_schema", "tables") {
1388 let query = "SELECT * FROM information_schema.tables;";
1389 let mut rewrite = DFParser::parse_sql(query)?;
1390 assert_eq!(rewrite.len(), 1);
1391 self.statement_to_plan(rewrite.pop_front().unwrap()) } else {
1393 plan_err!("SHOW TABLES is not supported unless information_schema is enabled")
1394 }
1395 }
1396
1397 fn describe_table_to_plan(&self, table_name: ObjectName) -> Result<LogicalPlan> {
1398 let table_ref = self.object_name_to_table_reference(table_name)?;
1399
1400 let table_source = self.context_provider.get_table_source(table_ref)?;
1401
1402 let schema = table_source.schema();
1403
1404 let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1405
1406 Ok(LogicalPlan::DescribeTable(DescribeTable {
1407 schema,
1408 output_schema: Arc::new(output_schema),
1409 }))
1410 }
1411
1412 fn describe_query_to_plan(&self, query: Query) -> Result<LogicalPlan> {
1413 let plan = self.query_to_plan(query, &mut PlannerContext::new())?;
1414
1415 let schema = Arc::new(plan.schema().as_arrow().clone());
1416
1417 let output_schema = DFSchema::try_from(LogicalPlan::describe_schema()).unwrap();
1418
1419 Ok(LogicalPlan::DescribeTable(DescribeTable {
1420 schema,
1421 output_schema: Arc::new(output_schema),
1422 }))
1423 }
1424
1425 fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
1426 let copy_source = statement.source;
1428 let (input, input_schema, table_ref) = match copy_source {
1429 CopyToSource::Relation(object_name) => {
1430 let table_name = object_name_to_string(&object_name);
1431 let table_ref = self.object_name_to_table_reference(object_name)?;
1432 let table_source =
1433 self.context_provider.get_table_source(table_ref.clone())?;
1434 let plan =
1435 LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?;
1436 let input_schema = Arc::clone(plan.schema());
1437 (plan, input_schema, Some(table_ref))
1438 }
1439 CopyToSource::Query(query) => {
1440 let plan = self.query_to_plan(*query, &mut PlannerContext::new())?;
1441 let input_schema = Arc::clone(plan.schema());
1442 (plan, input_schema, None)
1443 }
1444 };
1445
1446 let options_map = self.parse_options_map(statement.options, true)?;
1447
1448 let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
1449 self.context_provider.get_file_type(stored_as).ok()
1450 } else {
1451 None
1452 };
1453
1454 let file_type = match maybe_file_type {
1455 Some(ft) => ft,
1456 None => {
1457 let e = || {
1458 DataFusionError::Configuration(
1459 "Format not explicitly set and unable to get file extension! Use STORED AS to define file format."
1460 .to_string(),
1461 )
1462 };
1463 let extension: &str = &Path::new(&statement.target)
1465 .extension()
1466 .ok_or_else(e)?
1467 .to_str()
1468 .ok_or_else(e)?
1469 .to_lowercase();
1470
1471 self.context_provider.get_file_type(extension)?
1472 }
1473 };
1474
1475 let partition_by = statement
1476 .partitioned_by
1477 .iter()
1478 .map(|col| input_schema.field_with_name(table_ref.as_ref(), col))
1479 .collect::<Result<Vec<_>>>()?
1480 .into_iter()
1481 .map(|f| f.name().to_owned())
1482 .collect();
1483
1484 Ok(LogicalPlan::Copy(CopyTo::new(
1485 Arc::new(input),
1486 statement.target,
1487 partition_by,
1488 file_type,
1489 options_map,
1490 )))
1491 }
1492
1493 fn build_order_by(
1494 &self,
1495 order_exprs: Vec<LexOrdering>,
1496 schema: &DFSchemaRef,
1497 planner_context: &mut PlannerContext,
1498 ) -> Result<Vec<Vec<SortExpr>>> {
1499 if !order_exprs.is_empty() && schema.fields().is_empty() {
1500 let results = order_exprs
1501 .iter()
1502 .map(|lex_order| {
1503 let result = lex_order
1504 .iter()
1505 .map(|order_by_expr| {
1506 let ordered_expr = &order_by_expr.expr;
1507 let ordered_expr = ordered_expr.to_owned();
1508 let ordered_expr = self.sql_expr_to_logical_expr(
1509 ordered_expr,
1510 schema,
1511 planner_context,
1512 )?;
1513 let asc = order_by_expr.options.asc.unwrap_or(true);
1514 let nulls_first =
1515 order_by_expr.options.nulls_first.unwrap_or_else(|| {
1516 self.options.default_null_ordering.nulls_first(asc)
1517 });
1518
1519 Ok(SortExpr::new(ordered_expr, asc, nulls_first))
1520 })
1521 .collect::<Result<Vec<SortExpr>>>()?;
1522 Ok(result)
1523 })
1524 .collect::<Result<Vec<Vec<SortExpr>>>>()?;
1525
1526 return Ok(results);
1527 }
1528
1529 let mut all_results = vec![];
1530 for expr in order_exprs {
1531 let expr_vec =
1533 self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?;
1534 for sort in expr_vec.iter() {
1536 for column in sort.expr.column_refs().iter() {
1537 if !schema.has_column(column) {
1538 return plan_err!("Column {column} is not in schema");
1540 }
1541 }
1542 }
1543 all_results.push(expr_vec)
1545 }
1546 Ok(all_results)
1547 }
1548
1549 fn external_table_to_plan(
1551 &self,
1552 statement: CreateExternalTable,
1553 ) -> Result<LogicalPlan> {
1554 let definition = Some(statement.to_string());
1555 let CreateExternalTable {
1556 name,
1557 columns,
1558 file_type,
1559 location,
1560 table_partition_cols,
1561 if_not_exists,
1562 temporary,
1563 order_exprs,
1564 unbounded,
1565 options,
1566 constraints,
1567 or_replace,
1568 } = statement;
1569
1570 let mut all_constraints = constraints;
1572 let inline_constraints = calc_inline_constraints_from_columns(&columns);
1573 all_constraints.extend(inline_constraints);
1574
1575 let options_map = self.parse_options_map(options, false)?;
1576
1577 let compression = options_map
1578 .get("format.compression")
1579 .map(|c| CompressionTypeVariant::from_str(c))
1580 .transpose()?;
1581 if (file_type == "PARQUET" || file_type == "AVRO" || file_type == "ARROW")
1582 && compression
1583 .map(|c| c != CompressionTypeVariant::UNCOMPRESSED)
1584 .unwrap_or(false)
1585 {
1586 plan_err!(
1587 "File compression type cannot be set for PARQUET, AVRO, or ARROW files."
1588 )?;
1589 }
1590
1591 let mut planner_context = PlannerContext::new();
1592
1593 let column_defaults = self
1594 .build_column_defaults(&columns, &mut planner_context)?
1595 .into_iter()
1596 .collect();
1597
1598 let schema = self.build_schema(columns)?;
1599 let df_schema = schema.to_dfschema_ref()?;
1600 df_schema.check_names()?;
1601
1602 let ordered_exprs =
1603 self.build_order_by(order_exprs, &df_schema, &mut planner_context)?;
1604
1605 let name = self.object_name_to_table_reference(name)?;
1606 let constraints =
1607 self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
1608 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1609 PlanCreateExternalTable {
1610 schema: df_schema,
1611 name,
1612 location,
1613 file_type,
1614 table_partition_cols,
1615 if_not_exists,
1616 or_replace,
1617 temporary,
1618 definition,
1619 order_exprs: ordered_exprs,
1620 unbounded,
1621 options: options_map,
1622 constraints,
1623 column_defaults,
1624 },
1625 )))
1626 }
1627
1628 fn get_constraint_column_indices(
1631 &self,
1632 df_schema: &DFSchemaRef,
1633 columns: &[IndexColumn],
1634 constraint_name: &str,
1635 ) -> Result<Vec<usize>> {
1636 let field_names = df_schema.field_names();
1637 columns
1638 .iter()
1639 .map(|index_column| {
1640 let expr = &index_column.column.expr;
1641 let ident = if let SQLExpr::Identifier(ident) = expr {
1642 ident
1643 } else {
1644 return Err(plan_datafusion_err!(
1645 "Column name for {constraint_name} must be an identifier: {expr}"
1646 ));
1647 };
1648 let column = self.ident_normalizer.normalize(ident.clone());
1649 field_names
1650 .iter()
1651 .position(|item| *item == column)
1652 .ok_or_else(|| {
1653 plan_datafusion_err!(
1654 "Column for {constraint_name} not found in schema: {column}"
1655 )
1656 })
1657 })
1658 .collect::<Result<Vec<_>>>()
1659 }
1660
1661 pub fn new_constraint_from_table_constraints(
1663 &self,
1664 constraints: &[TableConstraint],
1665 df_schema: &DFSchemaRef,
1666 ) -> Result<Constraints> {
1667 let constraints = constraints
1668 .iter()
1669 .map(|c: &TableConstraint| match c {
1670 TableConstraint::Unique { name, columns, .. } => {
1671 let constraint_name = match name {
1672 Some(name) => &format!("unique constraint with name '{name}'"),
1673 None => "unique constraint",
1674 };
1675 let indices = self.get_constraint_column_indices(
1677 df_schema,
1678 columns,
1679 constraint_name,
1680 )?;
1681 Ok(Constraint::Unique(indices))
1682 }
1683 TableConstraint::PrimaryKey { columns, .. } => {
1684 let indices = self.get_constraint_column_indices(
1686 df_schema,
1687 columns,
1688 "primary key",
1689 )?;
1690 Ok(Constraint::PrimaryKey(indices))
1691 }
1692 TableConstraint::ForeignKey { .. } => {
1693 _plan_err!("Foreign key constraints are not currently supported")
1694 }
1695 TableConstraint::Check { .. } => {
1696 _plan_err!("Check constraints are not currently supported")
1697 }
1698 TableConstraint::Index { .. } => {
1699 _plan_err!("Indexes are not currently supported")
1700 }
1701 TableConstraint::FulltextOrSpatial { .. } => {
1702 _plan_err!("Indexes are not currently supported")
1703 }
1704 })
1705 .collect::<Result<Vec<_>>>()?;
1706 Ok(Constraints::new_unverified(constraints))
1707 }
1708
1709 fn parse_options_map(
1710 &self,
1711 options: Vec<(String, Value)>,
1712 allow_duplicates: bool,
1713 ) -> Result<HashMap<String, String>> {
1714 let mut options_map = HashMap::new();
1715 for (key, value) in options {
1716 if !allow_duplicates && options_map.contains_key(&key) {
1717 return plan_err!("Option {key} is specified multiple times");
1718 }
1719
1720 let Some(value_string) = crate::utils::value_to_string(&value) else {
1721 return plan_err!("Unsupported Value {}", value);
1722 };
1723
1724 if !(&key.contains('.')) {
1725 let renamed_key = format!("format.{key}");
1729 options_map.insert(renamed_key.to_lowercase(), value_string);
1730 } else {
1731 options_map.insert(key.to_lowercase(), value_string);
1732 }
1733 }
1734
1735 Ok(options_map)
1736 }
1737
1738 fn explain_to_plan(
1743 &self,
1744 verbose: bool,
1745 analyze: bool,
1746 format: Option<String>,
1747 statement: DFStatement,
1748 ) -> Result<LogicalPlan> {
1749 let plan = self.statement_to_plan(statement)?;
1750 if matches!(plan, LogicalPlan::Explain(_)) {
1751 return plan_err!("Nested EXPLAINs are not supported");
1752 }
1753
1754 let plan = Arc::new(plan);
1755 let schema = LogicalPlan::explain_schema();
1756 let schema = schema.to_dfschema_ref()?;
1757
1758 if verbose && format.is_some() {
1759 return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
1760 }
1761
1762 if analyze {
1763 if format.is_some() {
1764 return plan_err!("EXPLAIN ANALYZE with FORMAT is not supported");
1765 }
1766 Ok(LogicalPlan::Analyze(Analyze {
1767 verbose,
1768 input: plan,
1769 schema,
1770 }))
1771 } else {
1772 let stringified_plans =
1773 vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
1774
1775 let options = self.context_provider.options();
1778 let format = if verbose {
1779 ExplainFormat::Indent
1780 } else if let Some(format) = format {
1781 ExplainFormat::from_str(&format)?
1782 } else {
1783 options.explain.format.clone()
1784 };
1785
1786 Ok(LogicalPlan::Explain(Explain {
1787 verbose,
1788 explain_format: format,
1789 plan,
1790 stringified_plans,
1791 schema,
1792 logical_optimization_succeeded: false,
1793 }))
1794 }
1795 }
1796
1797 fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
1798 if !self.has_table("information_schema", "df_settings") {
1799 return plan_err!(
1800 "SHOW [VARIABLE] is not supported unless information_schema is enabled"
1801 );
1802 }
1803
1804 let verbose = variable
1805 .last()
1806 .map(|s| ident_to_string(s) == "verbose")
1807 .unwrap_or(false);
1808 let mut variable_vec = variable.to_vec();
1809 let mut columns: String = "name, value".to_owned();
1810
1811 if verbose {
1812 columns = format!("{columns}, description");
1813 variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
1814 }
1815
1816 let variable = object_name_to_string(&ObjectName::from(variable_vec));
1817 let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
1818 let query = if variable == "all" {
1819 format!("{base_query} ORDER BY name")
1821 } else if variable == "timezone" || variable == "time.zone" {
1822 format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
1824 } else {
1825 let is_valid_variable = self
1829 .context_provider
1830 .options()
1831 .entries()
1832 .iter()
1833 .any(|opt| opt.key == variable);
1834
1835 if !is_valid_variable {
1836 return plan_err!(
1837 "'{variable}' is not a variable which can be viewed with 'SHOW'"
1838 );
1839 }
1840
1841 format!("{base_query} WHERE name = '{variable}'")
1842 };
1843
1844 let mut rewrite = DFParser::parse_sql(&query)?;
1845 assert_eq!(rewrite.len(), 1);
1846
1847 self.statement_to_plan(rewrite.pop_front().unwrap())
1848 }
1849
1850 fn set_statement_to_plan(&self, statement: Set) -> Result<LogicalPlan> {
1851 match statement {
1852 Set::SingleAssignment {
1853 scope,
1854 hivevar,
1855 variable,
1856 values,
1857 } => {
1858 if scope.is_some() {
1859 return not_impl_err!("SET with scope modifiers is not supported");
1860 }
1861
1862 if hivevar {
1863 return not_impl_err!("SET HIVEVAR is not supported");
1864 }
1865
1866 let variable = object_name_to_string(&variable);
1867 let mut variable_lower = variable.to_lowercase();
1868
1869 if variable_lower == "timezone" || variable_lower == "time.zone" {
1870 variable_lower = "datafusion.execution.time_zone".to_string();
1871 }
1872
1873 if values.len() != 1 {
1874 return plan_err!("SET only supports single value assignment");
1875 }
1876
1877 let value_string = match &values[0] {
1878 SQLExpr::Identifier(i) => ident_to_string(i),
1879 SQLExpr::Value(v) => match crate::utils::value_to_string(&v.value) {
1880 None => {
1881 return plan_err!("Unsupported value {:?}", v.value);
1882 }
1883 Some(s) => s,
1884 },
1885 SQLExpr::UnaryOp { op, expr } => match op {
1886 UnaryOperator::Plus => format!("+{expr}"),
1887 UnaryOperator::Minus => format!("-{expr}"),
1888 _ => return plan_err!("Unsupported unary op {:?}", op),
1889 },
1890 _ => return plan_err!("Unsupported expr {:?}", values[0]),
1891 };
1892
1893 Ok(LogicalPlan::Statement(PlanStatement::SetVariable(
1894 SetVariable {
1895 variable: variable_lower,
1896 value: value_string,
1897 },
1898 )))
1899 }
1900 other => not_impl_err!("SET variant not implemented yet: {other:?}"),
1901 }
1902 }
1903
1904 fn delete_to_plan(
1905 &self,
1906 table_name: ObjectName,
1907 predicate_expr: Option<SQLExpr>,
1908 ) -> Result<LogicalPlan> {
1909 let table_ref = self.object_name_to_table_reference(table_name.clone())?;
1911 let table_source = self.context_provider.get_table_source(table_ref.clone())?;
1912 let schema = DFSchema::try_from_qualified_schema(
1913 table_ref.clone(),
1914 &table_source.schema(),
1915 )?;
1916 let scan =
1917 LogicalPlanBuilder::scan(table_ref.clone(), Arc::clone(&table_source), None)?
1918 .build()?;
1919 let mut planner_context = PlannerContext::new();
1920
1921 let source = match predicate_expr {
1922 None => scan,
1923 Some(predicate_expr) => {
1924 let filter_expr =
1925 self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
1926 let schema = Arc::new(schema);
1927 let mut using_columns = HashSet::new();
1928 expr_to_columns(&filter_expr, &mut using_columns)?;
1929 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
1930 filter_expr,
1931 &[&[&schema]],
1932 &[using_columns],
1933 )?;
1934 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
1935 }
1936 };
1937
1938 let plan = LogicalPlan::Dml(DmlStatement::new(
1939 table_ref,
1940 table_source,
1941 WriteOp::Delete,
1942 Arc::new(source),
1943 ));
1944 Ok(plan)
1945 }
1946
1947 fn update_to_plan(
1948 &self,
1949 table: TableWithJoins,
1950 assignments: Vec<Assignment>,
1951 from: Option<TableWithJoins>,
1952 predicate_expr: Option<SQLExpr>,
1953 ) -> Result<LogicalPlan> {
1954 let (table_name, table_alias) = match &table.relation {
1955 TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()),
1956 _ => plan_err!("Cannot update non-table relation!")?,
1957 };
1958
1959 let table_name = self.object_name_to_table_reference(table_name)?;
1961 let table_source = self.context_provider.get_table_source(table_name.clone())?;
1962 let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
1963 table_name.clone(),
1964 &table_source.schema(),
1965 )?);
1966
1967 let mut planner_context = PlannerContext::new();
1969 let mut assign_map = assignments
1970 .iter()
1971 .map(|assign| {
1972 let cols = match &assign.target {
1973 AssignmentTarget::ColumnName(cols) => cols,
1974 _ => plan_err!("Tuples are not supported")?,
1975 };
1976 let col_name: &Ident = cols
1977 .0
1978 .iter()
1979 .last()
1980 .ok_or_else(|| plan_datafusion_err!("Empty column id"))?
1981 .as_ident()
1982 .unwrap();
1983 table_schema.field_with_unqualified_name(&col_name.value)?;
1985 Ok((col_name.value.clone(), assign.value.clone()))
1986 })
1987 .collect::<Result<HashMap<String, SQLExpr>>>()?;
1988
1989 let mut input_tables = vec![table];
1991 input_tables.extend(from);
1992 let scan = self.plan_from_tables(input_tables, &mut planner_context)?;
1993
1994 let source = match predicate_expr {
1996 None => scan,
1997 Some(predicate_expr) => {
1998 let filter_expr = self.sql_to_expr(
1999 predicate_expr,
2000 scan.schema(),
2001 &mut planner_context,
2002 )?;
2003 let mut using_columns = HashSet::new();
2004 expr_to_columns(&filter_expr, &mut using_columns)?;
2005 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
2006 filter_expr,
2007 &[&[scan.schema()]],
2008 &[using_columns],
2009 )?;
2010 LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(scan))?)
2011 }
2012 };
2013
2014 let exprs = table_schema
2016 .iter()
2017 .map(|(qualifier, field)| {
2018 let expr = match assign_map.remove(field.name()) {
2019 Some(new_value) => {
2020 let mut expr = self.sql_to_expr(
2021 new_value,
2022 source.schema(),
2023 &mut planner_context,
2024 )?;
2025 if let Expr::Placeholder(placeholder) = &mut expr {
2027 placeholder.field = placeholder
2028 .field
2029 .take()
2030 .or_else(|| Some(Arc::clone(field)));
2031 }
2032 expr.cast_to(field.data_type(), source.schema())?
2034 }
2035 None => {
2036 if let Some(alias) = &table_alias {
2038 Expr::Column(Column::new(
2039 Some(self.ident_normalizer.normalize(alias.name.clone())),
2040 field.name(),
2041 ))
2042 } else {
2043 Expr::Column(Column::from((qualifier, field)))
2044 }
2045 }
2046 };
2047 Ok(expr.alias(field.name()))
2048 })
2049 .collect::<Result<Vec<_>>>()?;
2050
2051 let source = project(source, exprs)?;
2052
2053 let plan = LogicalPlan::Dml(DmlStatement::new(
2054 table_name,
2055 table_source,
2056 WriteOp::Update,
2057 Arc::new(source),
2058 ));
2059 Ok(plan)
2060 }
2061
2062 fn insert_to_plan(
2063 &self,
2064 table_name: ObjectName,
2065 columns: Vec<Ident>,
2066 source: Box<Query>,
2067 overwrite: bool,
2068 replace_into: bool,
2069 ) -> Result<LogicalPlan> {
2070 let table_name = self.object_name_to_table_reference(table_name)?;
2072 let table_source = self.context_provider.get_table_source(table_name.clone())?;
2073 let table_schema = DFSchema::try_from(table_source.schema())?;
2074
2075 let (fields, value_indices) = if columns.is_empty() {
2083 (
2085 table_schema.fields().clone(),
2086 (0..table_schema.fields().len())
2087 .map(Some)
2088 .collect::<Vec<_>>(),
2089 )
2090 } else {
2091 let mut value_indices = vec![None; table_schema.fields().len()];
2092 let fields = columns
2093 .into_iter()
2094 .enumerate()
2095 .map(|(i, c)| {
2096 let c = self.ident_normalizer.normalize(c);
2097 let column_index = table_schema
2098 .index_of_column_by_name(None, &c)
2099 .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
2100
2101 if value_indices[column_index].is_some() {
2102 return schema_err!(SchemaError::DuplicateUnqualifiedField {
2103 name: c,
2104 });
2105 } else {
2106 value_indices[column_index] = Some(i);
2107 }
2108 Ok(table_schema.field(column_index).clone())
2109 })
2110 .collect::<Result<Vec<_>>>()?;
2111 (Fields::from(fields), value_indices)
2112 };
2113
2114 let mut prepare_param_data_types = BTreeMap::new();
2116 if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
2117 for row in rows.iter() {
2118 for (idx, val) in row.iter().enumerate() {
2119 if let SQLExpr::Value(ValueWithSpan {
2120 value: Value::Placeholder(name),
2121 span: _,
2122 }) = val
2123 {
2124 let name =
2125 name.replace('$', "").parse::<usize>().map_err(|_| {
2126 plan_datafusion_err!("Can't parse placeholder: {name}")
2127 })? - 1;
2128 let field = fields.get(idx).ok_or_else(|| {
2129 plan_datafusion_err!(
2130 "Placeholder ${} refers to a non existent column",
2131 idx + 1
2132 )
2133 })?;
2134 let _ = prepare_param_data_types.insert(name, Arc::clone(field));
2135 }
2136 }
2137 }
2138 }
2139 let prepare_param_data_types = prepare_param_data_types.into_values().collect();
2140
2141 let mut planner_context =
2143 PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
2144 planner_context.set_table_schema(Some(DFSchemaRef::new(
2145 DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
2146 )));
2147 let source = self.query_to_plan(*source, &mut planner_context)?;
2148 if fields.len() != source.schema().fields().len() {
2149 plan_err!("Column count doesn't match insert query!")?;
2150 }
2151
2152 let exprs = value_indices
2153 .into_iter()
2154 .enumerate()
2155 .map(|(i, value_index)| {
2156 let target_field = table_schema.field(i);
2157 let expr = match value_index {
2158 Some(v) => {
2159 Expr::Column(Column::from(source.schema().qualified_field(v)))
2160 .cast_to(target_field.data_type(), source.schema())?
2161 }
2162 None => table_source
2164 .get_column_default(target_field.name())
2165 .cloned()
2166 .unwrap_or_else(|| {
2167 Expr::Literal(ScalarValue::Null, None)
2169 })
2170 .cast_to(target_field.data_type(), &DFSchema::empty())?,
2171 };
2172 Ok(expr.alias(target_field.name()))
2173 })
2174 .collect::<Result<Vec<Expr>>>()?;
2175 let source = project(source, exprs)?;
2176
2177 let insert_op = match (overwrite, replace_into) {
2178 (false, false) => InsertOp::Append,
2179 (true, false) => InsertOp::Overwrite,
2180 (false, true) => InsertOp::Replace,
2181 (true, true) => plan_err!("Conflicting insert operations: `overwrite` and `replace_into` cannot both be true")?,
2182 };
2183
2184 let plan = LogicalPlan::Dml(DmlStatement::new(
2185 table_name,
2186 Arc::clone(&table_source),
2187 WriteOp::Insert(insert_op),
2188 Arc::new(source),
2189 ));
2190 Ok(plan)
2191 }
2192
2193 fn show_columns_to_plan(
2194 &self,
2195 extended: bool,
2196 full: bool,
2197 sql_table_name: ObjectName,
2198 ) -> Result<LogicalPlan> {
2199 let where_clause = object_name_to_qualifier(
2201 &sql_table_name,
2202 self.options.enable_ident_normalization,
2203 )?;
2204
2205 if !self.has_table("information_schema", "columns") {
2206 return plan_err!(
2207 "SHOW COLUMNS is not supported unless information_schema is enabled"
2208 );
2209 }
2210
2211 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2213 let _ = self.context_provider.get_table_source(table_ref)?;
2214
2215 let select_list = if full || extended {
2217 "*"
2218 } else {
2219 "table_catalog, table_schema, table_name, column_name, data_type, is_nullable"
2220 };
2221
2222 let query = format!(
2223 "SELECT {select_list} FROM information_schema.columns WHERE {where_clause}"
2224 );
2225
2226 let mut rewrite = DFParser::parse_sql(&query)?;
2227 assert_eq!(rewrite.len(), 1);
2228 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2230
2231 fn show_functions_to_plan(
2242 &self,
2243 filter: Option<ShowStatementFilter>,
2244 ) -> Result<LogicalPlan> {
2245 let where_clause = if let Some(filter) = filter {
2246 match filter {
2247 ShowStatementFilter::Like(like) => {
2248 format!("WHERE p.function_name like '{like}'")
2249 }
2250 _ => return plan_err!("Unsupported SHOW FUNCTIONS filter"),
2251 }
2252 } else {
2253 "".to_string()
2254 };
2255
2256 let query = format!(
2257 r#"
2258SELECT DISTINCT
2259 p.*,
2260 r.function_type function_type,
2261 r.description description,
2262 r.syntax_example syntax_example
2263FROM
2264 (
2265 SELECT
2266 i.specific_name function_name,
2267 o.data_type return_type,
2268 array_agg(i.parameter_name ORDER BY i.ordinal_position ASC) parameters,
2269 array_agg(i.data_type ORDER BY i.ordinal_position ASC) parameter_types
2270 FROM (
2271 SELECT
2272 specific_catalog,
2273 specific_schema,
2274 specific_name,
2275 ordinal_position,
2276 parameter_name,
2277 data_type,
2278 rid
2279 FROM
2280 information_schema.parameters
2281 WHERE
2282 parameter_mode = 'IN'
2283 ) i
2284 JOIN
2285 (
2286 SELECT
2287 specific_catalog,
2288 specific_schema,
2289 specific_name,
2290 ordinal_position,
2291 parameter_name,
2292 data_type,
2293 rid
2294 FROM
2295 information_schema.parameters
2296 WHERE
2297 parameter_mode = 'OUT'
2298 ) o
2299 ON i.specific_catalog = o.specific_catalog
2300 AND i.specific_schema = o.specific_schema
2301 AND i.specific_name = o.specific_name
2302 AND i.rid = o.rid
2303 GROUP BY 1, 2, i.rid
2304 ) as p
2305JOIN information_schema.routines r
2306ON p.function_name = r.routine_name
2307{where_clause}
2308 "#
2309 );
2310 let mut rewrite = DFParser::parse_sql(&query)?;
2311 assert_eq!(rewrite.len(), 1);
2312 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2314
2315 fn show_create_table_to_plan(
2316 &self,
2317 sql_table_name: ObjectName,
2318 ) -> Result<LogicalPlan> {
2319 if !self.has_table("information_schema", "tables") {
2320 return plan_err!(
2321 "SHOW CREATE TABLE is not supported unless information_schema is enabled"
2322 );
2323 }
2324 let where_clause = object_name_to_qualifier(
2326 &sql_table_name,
2327 self.options.enable_ident_normalization,
2328 )?;
2329
2330 let table_ref = self.object_name_to_table_reference(sql_table_name)?;
2332 let _ = self.context_provider.get_table_source(table_ref)?;
2333
2334 let query = format!(
2335 "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
2336 );
2337
2338 let mut rewrite = DFParser::parse_sql(&query)?;
2339 assert_eq!(rewrite.len(), 1);
2340 self.statement_to_plan(rewrite.pop_front().unwrap()) }
2342
2343 fn has_table(&self, schema: &str, table: &str) -> bool {
2345 let tables_reference = TableReference::Partial {
2346 schema: schema.into(),
2347 table: table.into(),
2348 };
2349 self.context_provider
2350 .get_table_source(tables_reference)
2351 .is_ok()
2352 }
2353
2354 fn validate_transaction_kind(
2355 &self,
2356 kind: Option<BeginTransactionKind>,
2357 ) -> Result<()> {
2358 match kind {
2359 None => Ok(()),
2361 Some(BeginTransactionKind::Transaction) => Ok(()),
2363 Some(BeginTransactionKind::Work) => {
2364 not_impl_err!("Transaction kind not supported: {kind:?}")
2365 }
2366 }
2367 }
2368}