1use std::any::Any;
21use std::collections::hash_map::Entry;
22use std::collections::{HashMap, HashSet};
23use std::fmt::Debug;
24use std::sync::Arc;
25
26use crate::catalog::{CatalogProviderList, SchemaProvider, TableProviderFactory};
27use crate::datasource::file_format::FileFormatFactory;
28#[cfg(feature = "sql")]
29use crate::datasource::provider_as_source;
30use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner};
31use crate::execution::SessionStateDefaults;
32use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
33use arrow_schema::{DataType, FieldRef};
34use datafusion_catalog::information_schema::{
35 InformationSchemaProvider, INFORMATION_SCHEMA,
36};
37use datafusion_catalog::MemoryCatalogProviderList;
38use datafusion_catalog::{TableFunction, TableFunctionImpl};
39use datafusion_common::alias::AliasGenerator;
40#[cfg(feature = "sql")]
41use datafusion_common::config::Dialect;
42use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions};
43use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
44use datafusion_common::{
45 config_err, exec_err, plan_datafusion_err, DFSchema, DataFusionError,
46 ResolvedTableReference, TableReference,
47};
48use datafusion_execution::config::SessionConfig;
49use datafusion_execution::runtime_env::RuntimeEnv;
50use datafusion_execution::TaskContext;
51use datafusion_expr::execution_props::ExecutionProps;
52use datafusion_expr::expr_rewriter::FunctionRewrite;
53use datafusion_expr::planner::ExprPlanner;
54#[cfg(feature = "sql")]
55use datafusion_expr::planner::TypePlanner;
56use datafusion_expr::registry::{FunctionRegistry, SerializerRegistry};
57use datafusion_expr::simplify::SimplifyInfo;
58#[cfg(feature = "sql")]
59use datafusion_expr::TableSource;
60use datafusion_expr::{
61 AggregateUDF, Explain, Expr, ExprSchemable, LogicalPlan, ScalarUDF, WindowUDF,
62};
63use datafusion_optimizer::simplify_expressions::ExprSimplifier;
64use datafusion_optimizer::{
65 Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule,
66};
67use datafusion_physical_expr::create_physical_expr;
68use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
69use datafusion_physical_optimizer::optimizer::PhysicalOptimizer;
70use datafusion_physical_optimizer::PhysicalOptimizerRule;
71use datafusion_physical_plan::ExecutionPlan;
72use datafusion_session::Session;
73#[cfg(feature = "sql")]
74use datafusion_sql::{
75 parser::{DFParserBuilder, Statement},
76 planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel},
77};
78
79use async_trait::async_trait;
80use chrono::{DateTime, Utc};
81use itertools::Itertools;
82use log::{debug, info};
83use object_store::ObjectStore;
84#[cfg(feature = "sql")]
85use sqlparser::{
86 ast::{Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias},
87 dialect::dialect_from_str,
88};
89use url::Url;
90use uuid::Uuid;
91
92#[derive(Clone)]
134pub struct SessionState {
135 session_id: String,
137 analyzer: Analyzer,
139 expr_planners: Vec<Arc<dyn ExprPlanner>>,
141 #[cfg(feature = "sql")]
143 type_planner: Option<Arc<dyn TypePlanner>>,
144 optimizer: Optimizer,
146 physical_optimizers: PhysicalOptimizer,
148 query_planner: Arc<dyn QueryPlanner + Send + Sync>,
150 catalog_list: Arc<dyn CatalogProviderList>,
152 table_functions: HashMap<String, Arc<TableFunction>>,
154 scalar_functions: HashMap<String, Arc<ScalarUDF>>,
156 aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
158 window_functions: HashMap<String, Arc<WindowUDF>>,
160 serializer_registry: Arc<dyn SerializerRegistry>,
162 file_formats: HashMap<String, Arc<dyn FileFormatFactory>>,
164 config: SessionConfig,
166 table_options: TableOptions,
168 execution_props: ExecutionProps,
170 table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
180 runtime_env: Arc<RuntimeEnv>,
182 function_factory: Option<Arc<dyn FunctionFactory>>,
187 prepared_plans: HashMap<String, Arc<PreparedPlan>>,
190}
191
192impl Debug for SessionState {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196 let mut debug_struct = f.debug_struct("SessionState");
197 let ret = debug_struct
198 .field("session_id", &self.session_id)
199 .field("config", &self.config)
200 .field("runtime_env", &self.runtime_env)
201 .field("catalog_list", &self.catalog_list)
202 .field("serializer_registry", &self.serializer_registry)
203 .field("file_formats", &self.file_formats)
204 .field("execution_props", &self.execution_props)
205 .field("table_options", &self.table_options)
206 .field("table_factories", &self.table_factories)
207 .field("function_factory", &self.function_factory)
208 .field("expr_planners", &self.expr_planners);
209
210 #[cfg(feature = "sql")]
211 let ret = ret.field("type_planner", &self.type_planner);
212
213 ret.field("query_planners", &self.query_planner)
214 .field("analyzer", &self.analyzer)
215 .field("optimizer", &self.optimizer)
216 .field("physical_optimizers", &self.physical_optimizers)
217 .field("table_functions", &self.table_functions)
218 .field("scalar_functions", &self.scalar_functions)
219 .field("aggregate_functions", &self.aggregate_functions)
220 .field("window_functions", &self.window_functions)
221 .field("prepared_plans", &self.prepared_plans)
222 .finish()
223 }
224}
225
226#[async_trait]
227impl Session for SessionState {
228 fn session_id(&self) -> &str {
229 self.session_id()
230 }
231
232 fn config(&self) -> &SessionConfig {
233 self.config()
234 }
235
236 async fn create_physical_plan(
237 &self,
238 logical_plan: &LogicalPlan,
239 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
240 self.create_physical_plan(logical_plan).await
241 }
242
243 fn create_physical_expr(
244 &self,
245 expr: Expr,
246 df_schema: &DFSchema,
247 ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
248 self.create_physical_expr(expr, df_schema)
249 }
250
251 fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
252 &self.scalar_functions
253 }
254
255 fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
256 &self.aggregate_functions
257 }
258
259 fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
260 &self.window_functions
261 }
262
263 fn runtime_env(&self) -> &Arc<RuntimeEnv> {
264 self.runtime_env()
265 }
266
267 fn execution_props(&self) -> &ExecutionProps {
268 self.execution_props()
269 }
270
271 fn as_any(&self) -> &dyn Any {
272 self
273 }
274
275 fn table_options(&self) -> &TableOptions {
276 self.table_options()
277 }
278
279 fn table_options_mut(&mut self) -> &mut TableOptions {
280 self.table_options_mut()
281 }
282
283 fn task_ctx(&self) -> Arc<TaskContext> {
284 self.task_ctx()
285 }
286}
287
288impl SessionState {
289 pub(crate) fn resolve_table_ref(
290 &self,
291 table_ref: impl Into<TableReference>,
292 ) -> ResolvedTableReference {
293 let catalog = &self.config_options().catalog;
294 table_ref
295 .into()
296 .resolve(&catalog.default_catalog, &catalog.default_schema)
297 }
298
299 pub fn schema_for_ref(
302 &self,
303 table_ref: impl Into<TableReference>,
304 ) -> datafusion_common::Result<Arc<dyn SchemaProvider>> {
305 let resolved_ref = self.resolve_table_ref(table_ref);
306 if self.config.information_schema() && *resolved_ref.schema == *INFORMATION_SCHEMA
307 {
308 return Ok(Arc::new(InformationSchemaProvider::new(Arc::clone(
309 &self.catalog_list,
310 ))));
311 }
312
313 self.catalog_list
314 .catalog(&resolved_ref.catalog)
315 .ok_or_else(|| {
316 plan_datafusion_err!(
317 "failed to resolve catalog: {}",
318 resolved_ref.catalog
319 )
320 })?
321 .schema(&resolved_ref.schema)
322 .ok_or_else(|| {
323 plan_datafusion_err!("failed to resolve schema: {}", resolved_ref.schema)
324 })
325 }
326
327 pub fn add_analyzer_rule(
330 &mut self,
331 analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>,
332 ) -> &Self {
333 self.analyzer.rules.push(analyzer_rule);
334 self
335 }
336
337 pub(crate) fn append_optimizer_rule(
341 &mut self,
342 optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
343 ) {
344 self.optimizer.rules.push(optimizer_rule);
345 }
346
347 pub fn set_function_factory(&mut self, function_factory: Arc<dyn FunctionFactory>) {
349 self.function_factory = Some(function_factory);
350 }
351
352 pub fn function_factory(&self) -> Option<&Arc<dyn FunctionFactory>> {
354 self.function_factory.as_ref()
355 }
356
357 pub fn table_factories(&self) -> &HashMap<String, Arc<dyn TableProviderFactory>> {
359 &self.table_factories
360 }
361
362 pub fn table_factories_mut(
364 &mut self,
365 ) -> &mut HashMap<String, Arc<dyn TableProviderFactory>> {
366 &mut self.table_factories
367 }
368
369 #[cfg(feature = "sql")]
374 pub fn sql_to_statement(
375 &self,
376 sql: &str,
377 dialect: &Dialect,
378 ) -> datafusion_common::Result<Statement> {
379 let dialect = dialect_from_str(dialect).ok_or_else(|| {
380 plan_datafusion_err!(
381 "Unsupported SQL dialect: {dialect}. Available dialects: \
382 Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
383 MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks."
384 )
385 })?;
386
387 let recursion_limit = self.config.options().sql_parser.recursion_limit;
388
389 let mut statements = DFParserBuilder::new(sql)
390 .with_dialect(dialect.as_ref())
391 .with_recursion_limit(recursion_limit)
392 .build()?
393 .parse_statements()?;
394
395 if statements.len() > 1 {
396 return datafusion_common::not_impl_err!(
397 "The context currently only supports a single SQL statement"
398 );
399 }
400
401 let statement = statements.pop_front().ok_or_else(|| {
402 plan_datafusion_err!("No SQL statements were provided in the query string")
403 })?;
404 Ok(statement)
405 }
406
407 #[cfg(feature = "sql")]
411 pub fn sql_to_expr(
412 &self,
413 sql: &str,
414 dialect: &Dialect,
415 ) -> datafusion_common::Result<SQLExpr> {
416 self.sql_to_expr_with_alias(sql, dialect).map(|x| x.expr)
417 }
418
419 #[cfg(feature = "sql")]
423 pub fn sql_to_expr_with_alias(
424 &self,
425 sql: &str,
426 dialect: &Dialect,
427 ) -> datafusion_common::Result<SQLExprWithAlias> {
428 let dialect = dialect_from_str(dialect).ok_or_else(|| {
429 plan_datafusion_err!(
430 "Unsupported SQL dialect: {dialect}. Available dialects: \
431 Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
432 MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks."
433 )
434 })?;
435
436 let recursion_limit = self.config.options().sql_parser.recursion_limit;
437 let expr = DFParserBuilder::new(sql)
438 .with_dialect(dialect.as_ref())
439 .with_recursion_limit(recursion_limit)
440 .build()?
441 .parse_into_expr()?;
442
443 Ok(expr)
444 }
445
446 #[cfg(feature = "sql")]
452 pub fn resolve_table_references(
453 &self,
454 statement: &Statement,
455 ) -> datafusion_common::Result<Vec<TableReference>> {
456 let enable_ident_normalization =
457 self.config.options().sql_parser.enable_ident_normalization;
458 let (table_refs, _) = datafusion_sql::resolve::resolve_table_references(
459 statement,
460 enable_ident_normalization,
461 )?;
462 Ok(table_refs)
463 }
464
465 #[cfg(feature = "sql")]
467 pub async fn statement_to_plan(
468 &self,
469 statement: Statement,
470 ) -> datafusion_common::Result<LogicalPlan> {
471 let references = self.resolve_table_references(&statement)?;
472
473 let mut provider = SessionContextProvider {
474 state: self,
475 tables: HashMap::with_capacity(references.len()),
476 };
477
478 for reference in references {
479 let resolved = self.resolve_table_ref(reference);
480 if let Entry::Vacant(v) = provider.tables.entry(resolved) {
481 let resolved = v.key();
482 if let Ok(schema) = self.schema_for_ref(resolved.clone()) {
483 if let Some(table) = schema.table(&resolved.table).await? {
484 v.insert(provider_as_source(table));
485 }
486 }
487 }
488 }
489
490 let query = SqlToRel::new_with_options(&provider, self.get_parser_options());
491 query.statement_to_plan(statement)
492 }
493
494 #[cfg(feature = "sql")]
495 fn get_parser_options(&self) -> ParserOptions {
496 let sql_parser_options = &self.config.options().sql_parser;
497
498 ParserOptions {
499 parse_float_as_decimal: sql_parser_options.parse_float_as_decimal,
500 enable_ident_normalization: sql_parser_options.enable_ident_normalization,
501 enable_options_value_normalization: sql_parser_options
502 .enable_options_value_normalization,
503 support_varchar_with_length: sql_parser_options.support_varchar_with_length,
504 map_string_types_to_utf8view: sql_parser_options.map_string_types_to_utf8view,
505 collect_spans: sql_parser_options.collect_spans,
506 default_null_ordering: sql_parser_options
507 .default_null_ordering
508 .as_str()
509 .into(),
510 }
511 }
512
513 #[cfg(feature = "sql")]
526 pub async fn create_logical_plan(
527 &self,
528 sql: &str,
529 ) -> datafusion_common::Result<LogicalPlan> {
530 let dialect = self.config.options().sql_parser.dialect;
531 let statement = self.sql_to_statement(sql, &dialect)?;
532 let plan = self.statement_to_plan(statement).await?;
533 Ok(plan)
534 }
535
536 #[cfg(feature = "sql")]
540 pub fn create_logical_expr(
541 &self,
542 sql: &str,
543 df_schema: &DFSchema,
544 ) -> datafusion_common::Result<Expr> {
545 let dialect = self.config.options().sql_parser.dialect;
546
547 let sql_expr = self.sql_to_expr_with_alias(sql, &dialect)?;
548
549 let provider = SessionContextProvider {
550 state: self,
551 tables: HashMap::new(),
552 };
553
554 let query = SqlToRel::new_with_options(&provider, self.get_parser_options());
555 query.sql_to_expr_with_alias(sql_expr, df_schema, &mut PlannerContext::new())
556 }
557
558 pub fn analyzer(&self) -> &Analyzer {
560 &self.analyzer
561 }
562
563 pub fn optimizer(&self) -> &Optimizer {
565 &self.optimizer
566 }
567
568 pub fn expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
570 &self.expr_planners
571 }
572
573 pub fn query_planner(&self) -> &Arc<dyn QueryPlanner + Send + Sync> {
575 &self.query_planner
576 }
577
578 pub fn optimize(&self, plan: &LogicalPlan) -> datafusion_common::Result<LogicalPlan> {
580 if let LogicalPlan::Explain(e) = plan {
581 let mut stringified_plans = e.stringified_plans.clone();
582
583 let analyzer_result = self.analyzer.execute_and_check(
585 e.plan.as_ref().clone(),
586 &self.options(),
587 |analyzed_plan, analyzer| {
588 let analyzer_name = analyzer.name().to_string();
589 let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
590 stringified_plans.push(analyzed_plan.to_stringified(plan_type));
591 },
592 );
593 let analyzed_plan = match analyzer_result {
594 Ok(plan) => plan,
595 Err(DataFusionError::Context(analyzer_name, err)) => {
596 let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
597 stringified_plans
598 .push(StringifiedPlan::new(plan_type, err.to_string()));
599
600 return Ok(LogicalPlan::Explain(Explain {
601 verbose: e.verbose,
602 plan: Arc::clone(&e.plan),
603 explain_format: e.explain_format.clone(),
604 stringified_plans,
605 schema: Arc::clone(&e.schema),
606 logical_optimization_succeeded: false,
607 }));
608 }
609 Err(e) => return Err(e),
610 };
611
612 stringified_plans
614 .push(analyzed_plan.to_stringified(PlanType::FinalAnalyzedLogicalPlan));
615
616 let optimized_plan = self.optimizer.optimize(
618 analyzed_plan,
619 self,
620 |optimized_plan, optimizer| {
621 let optimizer_name = optimizer.name().to_string();
622 let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
623 stringified_plans.push(optimized_plan.to_stringified(plan_type));
624 },
625 );
626 let (plan, logical_optimization_succeeded) = match optimized_plan {
627 Ok(plan) => (Arc::new(plan), true),
628 Err(DataFusionError::Context(optimizer_name, err)) => {
629 let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
630 stringified_plans
631 .push(StringifiedPlan::new(plan_type, err.to_string()));
632 (Arc::clone(&e.plan), false)
633 }
634 Err(e) => return Err(e),
635 };
636
637 Ok(LogicalPlan::Explain(Explain {
638 verbose: e.verbose,
639 explain_format: e.explain_format.clone(),
640 plan,
641 stringified_plans,
642 schema: Arc::clone(&e.schema),
643 logical_optimization_succeeded,
644 }))
645 } else {
646 let analyzed_plan = self.analyzer.execute_and_check(
647 plan.clone(),
648 &self.options(),
649 |_, _| {},
650 )?;
651 self.optimizer.optimize(analyzed_plan, self, |_, _| {})
652 }
653 }
654
655 pub async fn create_physical_plan(
666 &self,
667 logical_plan: &LogicalPlan,
668 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
669 let logical_plan = self.optimize(logical_plan)?;
670 self.query_planner
671 .create_physical_plan(&logical_plan, self)
672 .await
673 }
674
675 pub fn create_physical_expr(
690 &self,
691 expr: Expr,
692 df_schema: &DFSchema,
693 ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
694 let simplifier =
695 ExprSimplifier::new(SessionSimplifyProvider::new(self, df_schema));
696 let mut expr = simplifier.coerce(expr, df_schema)?;
698
699 let config_options = self.config_options();
701 for rewrite in self.analyzer.function_rewrites() {
702 expr = expr
703 .transform_up_with_schema(df_schema, |expr, df_schema| {
704 rewrite.rewrite(expr, df_schema, config_options)
705 })?
706 .data;
707 }
708 create_physical_expr(&expr, df_schema, self.execution_props())
709 }
710
711 pub fn session_id(&self) -> &str {
713 &self.session_id
714 }
715
716 pub fn runtime_env(&self) -> &Arc<RuntimeEnv> {
718 &self.runtime_env
719 }
720
721 pub fn execution_props(&self) -> &ExecutionProps {
723 &self.execution_props
724 }
725
726 pub fn execution_props_mut(&mut self) -> &mut ExecutionProps {
728 &mut self.execution_props
729 }
730
731 pub fn config(&self) -> &SessionConfig {
733 &self.config
734 }
735
736 pub fn config_mut(&mut self) -> &mut SessionConfig {
738 &mut self.config
739 }
740
741 pub fn optimizers(&self) -> &[Arc<dyn OptimizerRule + Send + Sync>] {
743 &self.optimizer.rules
744 }
745
746 pub fn physical_optimizers(&self) -> &[Arc<dyn PhysicalOptimizerRule + Send + Sync>] {
748 &self.physical_optimizers.rules
749 }
750
751 pub fn config_options(&self) -> &Arc<ConfigOptions> {
753 self.config.options()
754 }
755
756 pub fn mark_start_execution(&mut self) {
758 let config = Arc::clone(self.config.options());
759 self.execution_props.mark_start_execution(config);
760 }
761
762 pub fn table_options(&self) -> &TableOptions {
764 &self.table_options
765 }
766
767 pub fn default_table_options(&self) -> TableOptions {
769 Session::default_table_options(self)
770 }
771
772 pub fn table_options_mut(&mut self) -> &mut TableOptions {
774 &mut self.table_options
775 }
776
777 pub fn register_table_options_extension<T: ConfigExtension>(&mut self, extension: T) {
780 self.table_options.extensions.insert(extension)
781 }
782
783 pub fn register_file_format(
787 &mut self,
788 file_format: Arc<dyn FileFormatFactory>,
789 overwrite: bool,
790 ) -> Result<(), DataFusionError> {
791 let ext = file_format.get_ext().to_lowercase();
792 match (self.file_formats.entry(ext.clone()), overwrite){
793 (Entry::Vacant(e), _) => {e.insert(file_format);},
794 (Entry::Occupied(mut e), true) => {e.insert(file_format);},
795 (Entry::Occupied(_), false) => return config_err!("File type already registered for extension {ext}. Set overwrite to true to replace this extension."),
796 };
797 Ok(())
798 }
799
800 pub fn get_file_format_factory(
803 &self,
804 ext: &str,
805 ) -> Option<Arc<dyn FileFormatFactory>> {
806 self.file_formats.get(&ext.to_lowercase()).cloned()
807 }
808
809 pub fn task_ctx(&self) -> Arc<TaskContext> {
811 Arc::new(TaskContext::from(self))
812 }
813
814 pub fn catalog_list(&self) -> &Arc<dyn CatalogProviderList> {
816 &self.catalog_list
817 }
818
819 pub(crate) fn register_catalog_list(
821 &mut self,
822 catalog_list: Arc<dyn CatalogProviderList>,
823 ) {
824 self.catalog_list = catalog_list;
825 }
826
827 pub fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
829 &self.scalar_functions
830 }
831
832 pub fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
834 &self.aggregate_functions
835 }
836
837 pub fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
839 &self.window_functions
840 }
841
842 pub fn table_functions(&self) -> &HashMap<String, Arc<TableFunction>> {
844 &self.table_functions
845 }
846
847 pub fn serializer_registry(&self) -> &Arc<dyn SerializerRegistry> {
849 &self.serializer_registry
850 }
851
852 pub fn version(&self) -> &str {
854 env!("CARGO_PKG_VERSION")
855 }
856
857 pub fn register_udtf(&mut self, name: &str, fun: Arc<dyn TableFunctionImpl>) {
859 self.table_functions.insert(
860 name.to_owned(),
861 Arc::new(TableFunction::new(name.to_owned(), fun)),
862 );
863 }
864
865 pub fn deregister_udtf(
867 &mut self,
868 name: &str,
869 ) -> datafusion_common::Result<Option<Arc<dyn TableFunctionImpl>>> {
870 let udtf = self.table_functions.remove(name);
871 Ok(udtf.map(|x| Arc::clone(x.function())))
872 }
873
874 pub(crate) fn store_prepared(
876 &mut self,
877 name: String,
878 fields: Vec<FieldRef>,
879 plan: Arc<LogicalPlan>,
880 ) -> datafusion_common::Result<()> {
881 match self.prepared_plans.entry(name) {
882 Entry::Vacant(e) => {
883 e.insert(Arc::new(PreparedPlan { fields, plan }));
884 Ok(())
885 }
886 Entry::Occupied(e) => {
887 exec_err!("Prepared statement '{}' already exists", e.key())
888 }
889 }
890 }
891
892 pub(crate) fn get_prepared(&self, name: &str) -> Option<Arc<PreparedPlan>> {
894 self.prepared_plans.get(name).map(Arc::clone)
895 }
896
897 pub(crate) fn remove_prepared(
899 &mut self,
900 name: &str,
901 ) -> datafusion_common::Result<()> {
902 match self.prepared_plans.remove(name) {
903 Some(_) => Ok(()),
904 None => exec_err!("Prepared statement '{}' does not exist", name),
905 }
906 }
907}
908
909pub struct SessionStateBuilder {
914 session_id: Option<String>,
915 analyzer: Option<Analyzer>,
916 expr_planners: Option<Vec<Arc<dyn ExprPlanner>>>,
917 #[cfg(feature = "sql")]
918 type_planner: Option<Arc<dyn TypePlanner>>,
919 optimizer: Option<Optimizer>,
920 physical_optimizers: Option<PhysicalOptimizer>,
921 query_planner: Option<Arc<dyn QueryPlanner + Send + Sync>>,
922 catalog_list: Option<Arc<dyn CatalogProviderList>>,
923 table_functions: Option<HashMap<String, Arc<TableFunction>>>,
924 scalar_functions: Option<Vec<Arc<ScalarUDF>>>,
925 aggregate_functions: Option<Vec<Arc<AggregateUDF>>>,
926 window_functions: Option<Vec<Arc<WindowUDF>>>,
927 serializer_registry: Option<Arc<dyn SerializerRegistry>>,
928 file_formats: Option<Vec<Arc<dyn FileFormatFactory>>>,
929 config: Option<SessionConfig>,
930 table_options: Option<TableOptions>,
931 execution_props: Option<ExecutionProps>,
932 table_factories: Option<HashMap<String, Arc<dyn TableProviderFactory>>>,
933 runtime_env: Option<Arc<RuntimeEnv>>,
934 function_factory: Option<Arc<dyn FunctionFactory>>,
935 analyzer_rules: Option<Vec<Arc<dyn AnalyzerRule + Send + Sync>>>,
937 optimizer_rules: Option<Vec<Arc<dyn OptimizerRule + Send + Sync>>>,
938 physical_optimizer_rules: Option<Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>>,
939}
940
941impl SessionStateBuilder {
942 pub fn new() -> Self {
950 Self {
951 session_id: None,
952 analyzer: None,
953 expr_planners: None,
954 #[cfg(feature = "sql")]
955 type_planner: None,
956 optimizer: None,
957 physical_optimizers: None,
958 query_planner: None,
959 catalog_list: None,
960 table_functions: None,
961 scalar_functions: None,
962 aggregate_functions: None,
963 window_functions: None,
964 serializer_registry: None,
965 file_formats: None,
966 table_options: None,
967 config: None,
968 execution_props: None,
969 table_factories: None,
970 runtime_env: None,
971 function_factory: None,
972 analyzer_rules: None,
974 optimizer_rules: None,
975 physical_optimizer_rules: None,
976 }
977 }
978
979 pub fn new_from_existing(existing: SessionState) -> Self {
986 let default_catalog_exist = existing
987 .catalog_list()
988 .catalog(&existing.config.options().catalog.default_catalog)
989 .is_some();
990 let create_default_catalog_and_schema = existing
992 .config
993 .options()
994 .catalog
995 .create_default_catalog_and_schema
996 && !default_catalog_exist;
997 let new_config = existing
998 .config
999 .with_create_default_catalog_and_schema(create_default_catalog_and_schema);
1000 Self {
1001 session_id: None,
1002 analyzer: Some(existing.analyzer),
1003 expr_planners: Some(existing.expr_planners),
1004 #[cfg(feature = "sql")]
1005 type_planner: existing.type_planner,
1006 optimizer: Some(existing.optimizer),
1007 physical_optimizers: Some(existing.physical_optimizers),
1008 query_planner: Some(existing.query_planner),
1009 catalog_list: Some(existing.catalog_list),
1010 table_functions: Some(existing.table_functions),
1011 scalar_functions: Some(existing.scalar_functions.into_values().collect_vec()),
1012 aggregate_functions: Some(
1013 existing.aggregate_functions.into_values().collect_vec(),
1014 ),
1015 window_functions: Some(existing.window_functions.into_values().collect_vec()),
1016 serializer_registry: Some(existing.serializer_registry),
1017 file_formats: Some(existing.file_formats.into_values().collect_vec()),
1018 config: Some(new_config),
1019 table_options: Some(existing.table_options),
1020 execution_props: Some(existing.execution_props),
1021 table_factories: Some(existing.table_factories),
1022 runtime_env: Some(existing.runtime_env),
1023 function_factory: existing.function_factory,
1024
1025 analyzer_rules: None,
1027 optimizer_rules: None,
1028 physical_optimizer_rules: None,
1029 }
1030 }
1031
1032 pub fn with_default_features(mut self) -> Self {
1037 self.table_factories
1038 .get_or_insert_with(HashMap::new)
1039 .extend(SessionStateDefaults::default_table_factories());
1040
1041 self.file_formats
1042 .get_or_insert_with(Vec::new)
1043 .extend(SessionStateDefaults::default_file_formats());
1044
1045 self.expr_planners
1046 .get_or_insert_with(Vec::new)
1047 .extend(SessionStateDefaults::default_expr_planners());
1048
1049 self.scalar_functions
1050 .get_or_insert_with(Vec::new)
1051 .extend(SessionStateDefaults::default_scalar_functions());
1052
1053 self.aggregate_functions
1054 .get_or_insert_with(Vec::new)
1055 .extend(SessionStateDefaults::default_aggregate_functions());
1056
1057 self.window_functions
1058 .get_or_insert_with(Vec::new)
1059 .extend(SessionStateDefaults::default_window_functions());
1060
1061 self.table_functions
1062 .get_or_insert_with(HashMap::new)
1063 .extend(
1064 SessionStateDefaults::default_table_functions()
1065 .into_iter()
1066 .map(|f| (f.name().to_string(), f)),
1067 );
1068
1069 self
1070 }
1071
1072 pub fn new_with_default_features() -> Self {
1085 Self::new().with_default_features()
1086 }
1087
1088 pub fn with_session_id(mut self, session_id: String) -> Self {
1090 self.session_id = Some(session_id);
1091 self
1092 }
1093
1094 pub fn with_analyzer_rules(
1096 mut self,
1097 rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
1098 ) -> Self {
1099 self.analyzer = Some(Analyzer::with_rules(rules));
1100 self
1101 }
1102
1103 pub fn with_analyzer_rule(
1106 mut self,
1107 analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>,
1108 ) -> Self {
1109 let mut rules = self.analyzer_rules.unwrap_or_default();
1110 rules.push(analyzer_rule);
1111 self.analyzer_rules = Some(rules);
1112 self
1113 }
1114
1115 pub fn with_optimizer_rules(
1117 mut self,
1118 rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
1119 ) -> Self {
1120 self.optimizer = Some(Optimizer::with_rules(rules));
1121 self
1122 }
1123
1124 pub fn with_optimizer_rule(
1127 mut self,
1128 optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
1129 ) -> Self {
1130 let mut rules = self.optimizer_rules.unwrap_or_default();
1131 rules.push(optimizer_rule);
1132 self.optimizer_rules = Some(rules);
1133 self
1134 }
1135
1136 pub fn with_expr_planners(
1138 mut self,
1139 expr_planners: Vec<Arc<dyn ExprPlanner>>,
1140 ) -> Self {
1141 self.expr_planners = Some(expr_planners);
1142 self
1143 }
1144
1145 #[cfg(feature = "sql")]
1147 pub fn with_type_planner(mut self, type_planner: Arc<dyn TypePlanner>) -> Self {
1148 self.type_planner = Some(type_planner);
1149 self
1150 }
1151
1152 pub fn with_physical_optimizer_rules(
1154 mut self,
1155 physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
1156 ) -> Self {
1157 self.physical_optimizers =
1158 Some(PhysicalOptimizer::with_rules(physical_optimizers));
1159 self
1160 }
1161
1162 pub fn with_physical_optimizer_rule(
1165 mut self,
1166 physical_optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
1167 ) -> Self {
1168 let mut rules = self.physical_optimizer_rules.unwrap_or_default();
1169 rules.push(physical_optimizer_rule);
1170 self.physical_optimizer_rules = Some(rules);
1171 self
1172 }
1173
1174 pub fn with_query_planner(
1176 mut self,
1177 query_planner: Arc<dyn QueryPlanner + Send + Sync>,
1178 ) -> Self {
1179 self.query_planner = Some(query_planner);
1180 self
1181 }
1182
1183 pub fn with_catalog_list(
1185 mut self,
1186 catalog_list: Arc<dyn CatalogProviderList>,
1187 ) -> Self {
1188 self.catalog_list = Some(catalog_list);
1189 self
1190 }
1191
1192 pub fn with_table_functions(
1194 mut self,
1195 table_functions: HashMap<String, Arc<TableFunction>>,
1196 ) -> Self {
1197 self.table_functions = Some(table_functions);
1198 self
1199 }
1200
1201 pub fn with_table_function_list(
1203 mut self,
1204 table_functions: Vec<Arc<TableFunction>>,
1205 ) -> Self {
1206 let functions = table_functions
1207 .into_iter()
1208 .map(|f| (f.name().to_string(), f))
1209 .collect();
1210 self.table_functions = Some(functions);
1211 self
1212 }
1213
1214 pub fn with_scalar_functions(
1216 mut self,
1217 scalar_functions: Vec<Arc<ScalarUDF>>,
1218 ) -> Self {
1219 self.scalar_functions = Some(scalar_functions);
1220 self
1221 }
1222
1223 pub fn with_aggregate_functions(
1225 mut self,
1226 aggregate_functions: Vec<Arc<AggregateUDF>>,
1227 ) -> Self {
1228 self.aggregate_functions = Some(aggregate_functions);
1229 self
1230 }
1231
1232 pub fn with_window_functions(
1234 mut self,
1235 window_functions: Vec<Arc<WindowUDF>>,
1236 ) -> Self {
1237 self.window_functions = Some(window_functions);
1238 self
1239 }
1240
1241 pub fn with_serializer_registry(
1243 mut self,
1244 serializer_registry: Arc<dyn SerializerRegistry>,
1245 ) -> Self {
1246 self.serializer_registry = Some(serializer_registry);
1247 self
1248 }
1249
1250 pub fn with_file_formats(
1252 mut self,
1253 file_formats: Vec<Arc<dyn FileFormatFactory>>,
1254 ) -> Self {
1255 self.file_formats = Some(file_formats);
1256 self
1257 }
1258
1259 pub fn with_config(mut self, config: SessionConfig) -> Self {
1261 self.config = Some(config);
1262 self
1263 }
1264
1265 pub fn with_table_options(mut self, table_options: TableOptions) -> Self {
1267 self.table_options = Some(table_options);
1268 self
1269 }
1270
1271 pub fn with_execution_props(mut self, execution_props: ExecutionProps) -> Self {
1273 self.execution_props = Some(execution_props);
1274 self
1275 }
1276
1277 pub fn with_table_factory(
1279 mut self,
1280 key: String,
1281 table_factory: Arc<dyn TableProviderFactory>,
1282 ) -> Self {
1283 let mut table_factories = self.table_factories.unwrap_or_default();
1284 table_factories.insert(key, table_factory);
1285 self.table_factories = Some(table_factories);
1286 self
1287 }
1288
1289 pub fn with_table_factories(
1291 mut self,
1292 table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
1293 ) -> Self {
1294 self.table_factories = Some(table_factories);
1295 self
1296 }
1297
1298 pub fn with_runtime_env(mut self, runtime_env: Arc<RuntimeEnv>) -> Self {
1300 self.runtime_env = Some(runtime_env);
1301 self
1302 }
1303
1304 pub fn with_function_factory(
1306 mut self,
1307 function_factory: Option<Arc<dyn FunctionFactory>>,
1308 ) -> Self {
1309 self.function_factory = function_factory;
1310 self
1311 }
1312
1313 pub fn with_object_store(
1334 mut self,
1335 url: &Url,
1336 object_store: Arc<dyn ObjectStore>,
1337 ) -> Self {
1338 if self.runtime_env.is_none() {
1339 self.runtime_env = Some(Arc::new(RuntimeEnv::default()));
1340 }
1341 self.runtime_env
1342 .as_ref()
1343 .unwrap()
1344 .register_object_store(url, object_store);
1345 self
1346 }
1347
1348 pub fn build(self) -> SessionState {
1354 let Self {
1355 session_id,
1356 analyzer,
1357 expr_planners,
1358 #[cfg(feature = "sql")]
1359 type_planner,
1360 optimizer,
1361 physical_optimizers,
1362 query_planner,
1363 catalog_list,
1364 table_functions,
1365 scalar_functions,
1366 aggregate_functions,
1367 window_functions,
1368 serializer_registry,
1369 file_formats,
1370 table_options,
1371 config,
1372 execution_props,
1373 table_factories,
1374 runtime_env,
1375 function_factory,
1376 analyzer_rules,
1377 optimizer_rules,
1378 physical_optimizer_rules,
1379 } = self;
1380
1381 let config = config.unwrap_or_default();
1382 let runtime_env = runtime_env.unwrap_or_else(|| Arc::new(RuntimeEnv::default()));
1383
1384 let mut state = SessionState {
1385 session_id: session_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
1386 analyzer: analyzer.unwrap_or_default(),
1387 expr_planners: expr_planners.unwrap_or_default(),
1388 #[cfg(feature = "sql")]
1389 type_planner,
1390 optimizer: optimizer.unwrap_or_default(),
1391 physical_optimizers: physical_optimizers.unwrap_or_default(),
1392 query_planner: query_planner
1393 .unwrap_or_else(|| Arc::new(DefaultQueryPlanner {})),
1394 catalog_list: catalog_list.unwrap_or_else(|| {
1395 Arc::new(MemoryCatalogProviderList::new()) as Arc<dyn CatalogProviderList>
1396 }),
1397 table_functions: table_functions.unwrap_or_default(),
1398 scalar_functions: HashMap::new(),
1399 aggregate_functions: HashMap::new(),
1400 window_functions: HashMap::new(),
1401 serializer_registry: serializer_registry
1402 .unwrap_or_else(|| Arc::new(EmptySerializerRegistry)),
1403 file_formats: HashMap::new(),
1404 table_options: table_options.unwrap_or_else(|| {
1405 TableOptions::default_from_session_config(config.options())
1406 }),
1407 config,
1408 execution_props: execution_props.unwrap_or_default(),
1409 table_factories: table_factories.unwrap_or_default(),
1410 runtime_env,
1411 function_factory,
1412 prepared_plans: HashMap::new(),
1413 };
1414
1415 if let Some(file_formats) = file_formats {
1416 for file_format in file_formats {
1417 if let Err(e) = state.register_file_format(file_format, false) {
1418 info!("Unable to register file format: {e}")
1419 };
1420 }
1421 }
1422
1423 if let Some(scalar_functions) = scalar_functions {
1424 for udf in scalar_functions {
1425 let config_options = state.config().options();
1426 match udf.inner().with_updated_config(config_options) {
1427 Some(new_udf) => {
1428 if let Err(err) = state.register_udf(Arc::new(new_udf)) {
1429 debug!(
1430 "Failed to re-register updated UDF '{}': {}",
1431 udf.name(),
1432 err
1433 );
1434 }
1435 }
1436 None => match state.register_udf(Arc::clone(&udf)) {
1437 Ok(Some(existing)) => {
1438 debug!("Overwrote existing UDF '{}'", existing.name());
1439 }
1440 Ok(None) => {
1441 debug!("Registered UDF '{}'", udf.name());
1442 }
1443 Err(err) => {
1444 debug!("Failed to register UDF '{}': {}", udf.name(), err);
1445 }
1446 },
1447 }
1448 }
1449 }
1450
1451 if let Some(aggregate_functions) = aggregate_functions {
1452 aggregate_functions.into_iter().for_each(|udaf| {
1453 let existing_udf = state.register_udaf(udaf);
1454 if let Ok(Some(existing_udf)) = existing_udf {
1455 debug!("Overwrote an existing UDF: {}", existing_udf.name());
1456 }
1457 });
1458 }
1459
1460 if let Some(window_functions) = window_functions {
1461 window_functions.into_iter().for_each(|udwf| {
1462 let existing_udf = state.register_udwf(udwf);
1463 if let Ok(Some(existing_udf)) = existing_udf {
1464 debug!("Overwrote an existing UDF: {}", existing_udf.name());
1465 }
1466 });
1467 }
1468
1469 if state.config.create_default_catalog_and_schema() {
1470 let default_catalog = SessionStateDefaults::default_catalog(
1471 &state.config,
1472 &state.table_factories,
1473 &state.runtime_env,
1474 );
1475
1476 let existing_default_catalog = state.catalog_list.register_catalog(
1477 state.config.options().catalog.default_catalog.clone(),
1478 Arc::new(default_catalog),
1479 );
1480
1481 if existing_default_catalog.is_some() {
1482 debug!("Overwrote the default catalog");
1483 }
1484 }
1485
1486 if let Some(analyzer_rules) = analyzer_rules {
1487 for analyzer_rule in analyzer_rules {
1488 state.analyzer.rules.push(analyzer_rule);
1489 }
1490 }
1491
1492 if let Some(optimizer_rules) = optimizer_rules {
1493 for optimizer_rule in optimizer_rules {
1494 state.optimizer.rules.push(optimizer_rule);
1495 }
1496 }
1497
1498 if let Some(physical_optimizer_rules) = physical_optimizer_rules {
1499 for physical_optimizer_rule in physical_optimizer_rules {
1500 state
1501 .physical_optimizers
1502 .rules
1503 .push(physical_optimizer_rule);
1504 }
1505 }
1506
1507 state
1508 }
1509
1510 pub fn session_id(&self) -> &Option<String> {
1512 &self.session_id
1513 }
1514
1515 pub fn analyzer(&mut self) -> &mut Option<Analyzer> {
1517 &mut self.analyzer
1518 }
1519
1520 pub fn expr_planners(&mut self) -> &mut Option<Vec<Arc<dyn ExprPlanner>>> {
1522 &mut self.expr_planners
1523 }
1524
1525 #[cfg(feature = "sql")]
1527 pub fn type_planner(&mut self) -> &mut Option<Arc<dyn TypePlanner>> {
1528 &mut self.type_planner
1529 }
1530
1531 pub fn optimizer(&mut self) -> &mut Option<Optimizer> {
1533 &mut self.optimizer
1534 }
1535
1536 pub fn physical_optimizers(&mut self) -> &mut Option<PhysicalOptimizer> {
1538 &mut self.physical_optimizers
1539 }
1540
1541 pub fn query_planner(&mut self) -> &mut Option<Arc<dyn QueryPlanner + Send + Sync>> {
1543 &mut self.query_planner
1544 }
1545
1546 pub fn catalog_list(&mut self) -> &mut Option<Arc<dyn CatalogProviderList>> {
1548 &mut self.catalog_list
1549 }
1550
1551 pub fn table_functions(
1553 &mut self,
1554 ) -> &mut Option<HashMap<String, Arc<TableFunction>>> {
1555 &mut self.table_functions
1556 }
1557
1558 pub fn scalar_functions(&mut self) -> &mut Option<Vec<Arc<ScalarUDF>>> {
1560 &mut self.scalar_functions
1561 }
1562
1563 pub fn aggregate_functions(&mut self) -> &mut Option<Vec<Arc<AggregateUDF>>> {
1565 &mut self.aggregate_functions
1566 }
1567
1568 pub fn window_functions(&mut self) -> &mut Option<Vec<Arc<WindowUDF>>> {
1570 &mut self.window_functions
1571 }
1572
1573 pub fn serializer_registry(&mut self) -> &mut Option<Arc<dyn SerializerRegistry>> {
1575 &mut self.serializer_registry
1576 }
1577
1578 pub fn file_formats(&mut self) -> &mut Option<Vec<Arc<dyn FileFormatFactory>>> {
1580 &mut self.file_formats
1581 }
1582
1583 pub fn config(&mut self) -> &mut Option<SessionConfig> {
1585 &mut self.config
1586 }
1587
1588 pub fn table_options(&mut self) -> &mut Option<TableOptions> {
1590 &mut self.table_options
1591 }
1592
1593 pub fn execution_props(&mut self) -> &mut Option<ExecutionProps> {
1595 &mut self.execution_props
1596 }
1597
1598 pub fn table_factories(
1600 &mut self,
1601 ) -> &mut Option<HashMap<String, Arc<dyn TableProviderFactory>>> {
1602 &mut self.table_factories
1603 }
1604
1605 pub fn runtime_env(&mut self) -> &mut Option<Arc<RuntimeEnv>> {
1607 &mut self.runtime_env
1608 }
1609
1610 pub fn function_factory(&mut self) -> &mut Option<Arc<dyn FunctionFactory>> {
1612 &mut self.function_factory
1613 }
1614
1615 pub fn analyzer_rules(
1617 &mut self,
1618 ) -> &mut Option<Vec<Arc<dyn AnalyzerRule + Send + Sync>>> {
1619 &mut self.analyzer_rules
1620 }
1621
1622 pub fn optimizer_rules(
1624 &mut self,
1625 ) -> &mut Option<Vec<Arc<dyn OptimizerRule + Send + Sync>>> {
1626 &mut self.optimizer_rules
1627 }
1628
1629 pub fn physical_optimizer_rules(
1631 &mut self,
1632 ) -> &mut Option<Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>> {
1633 &mut self.physical_optimizer_rules
1634 }
1635}
1636
1637impl Debug for SessionStateBuilder {
1638 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1641 let mut debug_struct = f.debug_struct("SessionStateBuilder");
1642 let ret = debug_struct
1643 .field("session_id", &self.session_id)
1644 .field("config", &self.config)
1645 .field("runtime_env", &self.runtime_env)
1646 .field("catalog_list", &self.catalog_list)
1647 .field("serializer_registry", &self.serializer_registry)
1648 .field("file_formats", &self.file_formats)
1649 .field("execution_props", &self.execution_props)
1650 .field("table_options", &self.table_options)
1651 .field("table_factories", &self.table_factories)
1652 .field("function_factory", &self.function_factory)
1653 .field("expr_planners", &self.expr_planners);
1654 #[cfg(feature = "sql")]
1655 let ret = ret.field("type_planner", &self.type_planner);
1656 ret.field("query_planners", &self.query_planner)
1657 .field("analyzer_rules", &self.analyzer_rules)
1658 .field("analyzer", &self.analyzer)
1659 .field("optimizer_rules", &self.optimizer_rules)
1660 .field("optimizer", &self.optimizer)
1661 .field("physical_optimizer_rules", &self.physical_optimizer_rules)
1662 .field("physical_optimizers", &self.physical_optimizers)
1663 .field("table_functions", &self.table_functions)
1664 .field("scalar_functions", &self.scalar_functions)
1665 .field("aggregate_functions", &self.aggregate_functions)
1666 .field("window_functions", &self.window_functions)
1667 .finish()
1668 }
1669}
1670
1671impl Default for SessionStateBuilder {
1672 fn default() -> Self {
1673 Self::new()
1674 }
1675}
1676
1677impl From<SessionState> for SessionStateBuilder {
1678 fn from(state: SessionState) -> Self {
1679 SessionStateBuilder::new_from_existing(state)
1680 }
1681}
1682
1683#[cfg(feature = "sql")]
1688struct SessionContextProvider<'a> {
1689 state: &'a SessionState,
1690 tables: HashMap<ResolvedTableReference, Arc<dyn TableSource>>,
1691}
1692
1693#[cfg(feature = "sql")]
1694impl ContextProvider for SessionContextProvider<'_> {
1695 fn get_expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
1696 self.state.expr_planners()
1697 }
1698
1699 fn get_type_planner(&self) -> Option<Arc<dyn TypePlanner>> {
1700 if let Some(type_planner) = &self.state.type_planner {
1701 Some(Arc::clone(type_planner))
1702 } else {
1703 None
1704 }
1705 }
1706
1707 fn get_table_source(
1708 &self,
1709 name: TableReference,
1710 ) -> datafusion_common::Result<Arc<dyn TableSource>> {
1711 let name = self.state.resolve_table_ref(name);
1712 self.tables
1713 .get(&name)
1714 .cloned()
1715 .ok_or_else(|| plan_datafusion_err!("table '{name}' not found"))
1716 }
1717
1718 fn get_table_function_source(
1719 &self,
1720 name: &str,
1721 args: Vec<Expr>,
1722 ) -> datafusion_common::Result<Arc<dyn TableSource>> {
1723 let tbl_func = self
1724 .state
1725 .table_functions
1726 .get(name)
1727 .cloned()
1728 .ok_or_else(|| plan_datafusion_err!("table function '{name}' not found"))?;
1729 let dummy_schema = DFSchema::empty();
1730 let simplifier =
1731 ExprSimplifier::new(SessionSimplifyProvider::new(self.state, &dummy_schema));
1732 let args = args
1733 .into_iter()
1734 .map(|arg| simplifier.simplify(arg))
1735 .collect::<datafusion_common::Result<Vec<_>>>()?;
1736 let provider = tbl_func.create_table_provider(&args)?;
1737
1738 Ok(provider_as_source(provider))
1739 }
1740
1741 fn create_cte_work_table(
1745 &self,
1746 name: &str,
1747 schema: arrow::datatypes::SchemaRef,
1748 ) -> datafusion_common::Result<Arc<dyn TableSource>> {
1749 let table = Arc::new(crate::datasource::cte_worktable::CteWorkTable::new(
1750 name, schema,
1751 ));
1752 Ok(provider_as_source(table))
1753 }
1754
1755 fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
1756 self.state.scalar_functions().get(name).cloned()
1757 }
1758
1759 fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
1760 self.state.aggregate_functions().get(name).cloned()
1761 }
1762
1763 fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
1764 self.state.window_functions().get(name).cloned()
1765 }
1766
1767 fn get_variable_type(&self, variable_names: &[String]) -> Option<DataType> {
1768 use datafusion_expr::var_provider::{is_system_variables, VarType};
1769
1770 if variable_names.is_empty() {
1771 return None;
1772 }
1773
1774 let provider_type = if is_system_variables(variable_names) {
1775 VarType::System
1776 } else {
1777 VarType::UserDefined
1778 };
1779
1780 self.state
1781 .execution_props
1782 .var_providers
1783 .as_ref()
1784 .and_then(|provider| provider.get(&provider_type)?.get_type(variable_names))
1785 }
1786
1787 fn options(&self) -> &ConfigOptions {
1788 self.state.config_options()
1789 }
1790
1791 fn udf_names(&self) -> Vec<String> {
1792 self.state.scalar_functions().keys().cloned().collect()
1793 }
1794
1795 fn udaf_names(&self) -> Vec<String> {
1796 self.state.aggregate_functions().keys().cloned().collect()
1797 }
1798
1799 fn udwf_names(&self) -> Vec<String> {
1800 self.state.window_functions().keys().cloned().collect()
1801 }
1802
1803 fn get_file_type(
1804 &self,
1805 ext: &str,
1806 ) -> datafusion_common::Result<
1807 Arc<dyn datafusion_common::file_options::file_type::FileType>,
1808 > {
1809 self.state
1810 .file_formats
1811 .get(&ext.to_lowercase())
1812 .ok_or(plan_datafusion_err!(
1813 "There is no registered file format with ext {ext}"
1814 ))
1815 .map(|file_type| {
1816 crate::datasource::file_format::format_as_file_type(Arc::clone(file_type))
1817 })
1818 }
1819}
1820
1821impl FunctionRegistry for SessionState {
1822 fn udfs(&self) -> HashSet<String> {
1823 self.scalar_functions.keys().cloned().collect()
1824 }
1825
1826 fn udf(&self, name: &str) -> datafusion_common::Result<Arc<ScalarUDF>> {
1827 let result = self.scalar_functions.get(name);
1828
1829 result.cloned().ok_or_else(|| {
1830 plan_datafusion_err!("There is no UDF named \"{name}\" in the registry. Use session context `register_udf` function to register a custom UDF")
1831 })
1832 }
1833
1834 fn udaf(&self, name: &str) -> datafusion_common::Result<Arc<AggregateUDF>> {
1835 let result = self.aggregate_functions.get(name);
1836
1837 result.cloned().ok_or_else(|| {
1838 plan_datafusion_err!("There is no UDAF named \"{name}\" in the registry. Use session context `register_udaf` function to register a custom UDAF")
1839 })
1840 }
1841
1842 fn udwf(&self, name: &str) -> datafusion_common::Result<Arc<WindowUDF>> {
1843 let result = self.window_functions.get(name);
1844
1845 result.cloned().ok_or_else(|| {
1846 plan_datafusion_err!("There is no UDWF named \"{name}\" in the registry. Use session context `register_udwf` function to register a custom UDWF")
1847 })
1848 }
1849
1850 fn register_udf(
1851 &mut self,
1852 udf: Arc<ScalarUDF>,
1853 ) -> datafusion_common::Result<Option<Arc<ScalarUDF>>> {
1854 udf.aliases().iter().for_each(|alias| {
1855 self.scalar_functions
1856 .insert(alias.clone(), Arc::clone(&udf));
1857 });
1858 Ok(self.scalar_functions.insert(udf.name().into(), udf))
1859 }
1860
1861 fn register_udaf(
1862 &mut self,
1863 udaf: Arc<AggregateUDF>,
1864 ) -> datafusion_common::Result<Option<Arc<AggregateUDF>>> {
1865 udaf.aliases().iter().for_each(|alias| {
1866 self.aggregate_functions
1867 .insert(alias.clone(), Arc::clone(&udaf));
1868 });
1869 Ok(self.aggregate_functions.insert(udaf.name().into(), udaf))
1870 }
1871
1872 fn register_udwf(
1873 &mut self,
1874 udwf: Arc<WindowUDF>,
1875 ) -> datafusion_common::Result<Option<Arc<WindowUDF>>> {
1876 udwf.aliases().iter().for_each(|alias| {
1877 self.window_functions
1878 .insert(alias.clone(), Arc::clone(&udwf));
1879 });
1880 Ok(self.window_functions.insert(udwf.name().into(), udwf))
1881 }
1882
1883 fn deregister_udf(
1884 &mut self,
1885 name: &str,
1886 ) -> datafusion_common::Result<Option<Arc<ScalarUDF>>> {
1887 let udf = self.scalar_functions.remove(name);
1888 if let Some(udf) = &udf {
1889 for alias in udf.aliases() {
1890 self.scalar_functions.remove(alias);
1891 }
1892 }
1893 Ok(udf)
1894 }
1895
1896 fn deregister_udaf(
1897 &mut self,
1898 name: &str,
1899 ) -> datafusion_common::Result<Option<Arc<AggregateUDF>>> {
1900 let udaf = self.aggregate_functions.remove(name);
1901 if let Some(udaf) = &udaf {
1902 for alias in udaf.aliases() {
1903 self.aggregate_functions.remove(alias);
1904 }
1905 }
1906 Ok(udaf)
1907 }
1908
1909 fn deregister_udwf(
1910 &mut self,
1911 name: &str,
1912 ) -> datafusion_common::Result<Option<Arc<WindowUDF>>> {
1913 let udwf = self.window_functions.remove(name);
1914 if let Some(udwf) = &udwf {
1915 for alias in udwf.aliases() {
1916 self.window_functions.remove(alias);
1917 }
1918 }
1919 Ok(udwf)
1920 }
1921
1922 fn register_function_rewrite(
1923 &mut self,
1924 rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
1925 ) -> datafusion_common::Result<()> {
1926 self.analyzer.add_function_rewrite(rewrite);
1927 Ok(())
1928 }
1929
1930 fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
1931 self.expr_planners.clone()
1932 }
1933
1934 fn register_expr_planner(
1935 &mut self,
1936 expr_planner: Arc<dyn ExprPlanner>,
1937 ) -> datafusion_common::Result<()> {
1938 self.expr_planners.push(expr_planner);
1939 Ok(())
1940 }
1941
1942 fn udafs(&self) -> HashSet<String> {
1943 self.aggregate_functions.keys().cloned().collect()
1944 }
1945
1946 fn udwfs(&self) -> HashSet<String> {
1947 self.window_functions.keys().cloned().collect()
1948 }
1949}
1950
1951impl OptimizerConfig for SessionState {
1952 fn query_execution_start_time(&self) -> DateTime<Utc> {
1953 self.execution_props.query_execution_start_time
1954 }
1955
1956 fn alias_generator(&self) -> &Arc<AliasGenerator> {
1957 &self.execution_props.alias_generator
1958 }
1959
1960 fn options(&self) -> Arc<ConfigOptions> {
1961 Arc::clone(self.config.options())
1962 }
1963
1964 fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
1965 Some(self)
1966 }
1967}
1968
1969impl From<&SessionState> for TaskContext {
1971 fn from(state: &SessionState) -> Self {
1972 let task_id = None;
1973 TaskContext::new(
1974 task_id,
1975 state.session_id.clone(),
1976 state.config.clone(),
1977 state.scalar_functions.clone(),
1978 state.aggregate_functions.clone(),
1979 state.window_functions.clone(),
1980 Arc::clone(&state.runtime_env),
1981 )
1982 }
1983}
1984
1985#[derive(Debug)]
1987struct DefaultQueryPlanner {}
1988
1989#[async_trait]
1990impl QueryPlanner for DefaultQueryPlanner {
1991 async fn create_physical_plan(
1993 &self,
1994 logical_plan: &LogicalPlan,
1995 session_state: &SessionState,
1996 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
1997 let planner = DefaultPhysicalPlanner::default();
1998 planner
1999 .create_physical_plan(logical_plan, session_state)
2000 .await
2001 }
2002}
2003
2004struct SessionSimplifyProvider<'a> {
2005 state: &'a SessionState,
2006 df_schema: &'a DFSchema,
2007}
2008
2009impl<'a> SessionSimplifyProvider<'a> {
2010 fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self {
2011 Self { state, df_schema }
2012 }
2013}
2014
2015impl SimplifyInfo for SessionSimplifyProvider<'_> {
2016 fn is_boolean_type(&self, expr: &Expr) -> datafusion_common::Result<bool> {
2017 Ok(expr.get_type(self.df_schema)? == DataType::Boolean)
2018 }
2019
2020 fn nullable(&self, expr: &Expr) -> datafusion_common::Result<bool> {
2021 expr.nullable(self.df_schema)
2022 }
2023
2024 fn execution_props(&self) -> &ExecutionProps {
2025 self.state.execution_props()
2026 }
2027
2028 fn get_data_type(&self, expr: &Expr) -> datafusion_common::Result<DataType> {
2029 expr.get_type(self.df_schema)
2030 }
2031}
2032
2033#[derive(Debug)]
2034pub(crate) struct PreparedPlan {
2035 pub(crate) fields: Vec<FieldRef>,
2037 pub(crate) plan: Arc<LogicalPlan>,
2039}
2040
2041#[cfg(test)]
2042mod tests {
2043 use super::{SessionContextProvider, SessionStateBuilder};
2044 use crate::common::assert_contains;
2045 use crate::config::ConfigOptions;
2046 use crate::datasource::empty::EmptyTable;
2047 use crate::datasource::provider_as_source;
2048 use crate::datasource::MemTable;
2049 use crate::execution::context::SessionState;
2050 use crate::logical_expr::planner::ExprPlanner;
2051 use crate::logical_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
2052 use crate::physical_plan::ExecutionPlan;
2053 use crate::sql::planner::ContextProvider;
2054 use crate::sql::{ResolvedTableReference, TableReference};
2055 use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
2056 use arrow::datatypes::{DataType, Field, Schema};
2057 use datafusion_catalog::MemoryCatalogProviderList;
2058 use datafusion_common::config::Dialect;
2059 use datafusion_common::DFSchema;
2060 use datafusion_common::Result;
2061 use datafusion_execution::config::SessionConfig;
2062 use datafusion_expr::Expr;
2063 use datafusion_optimizer::optimizer::OptimizerRule;
2064 use datafusion_optimizer::Optimizer;
2065 use datafusion_physical_plan::display::DisplayableExecutionPlan;
2066 use datafusion_sql::planner::{PlannerContext, SqlToRel};
2067 use std::collections::HashMap;
2068 use std::sync::Arc;
2069
2070 #[test]
2071 #[cfg(feature = "sql")]
2072 fn test_session_state_with_default_features() {
2073 #[cfg(feature = "sql")]
2075 fn sql_to_expr(state: &SessionState) -> Result<Expr> {
2076 let provider = SessionContextProvider {
2077 state,
2078 tables: HashMap::new(),
2079 };
2080
2081 let sql = "[1,2,3]";
2082 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
2083 let df_schema = DFSchema::try_from(schema)?;
2084 let dialect = state.config.options().sql_parser.dialect;
2085 let sql_expr = state.sql_to_expr(sql, &dialect)?;
2086
2087 let query = SqlToRel::new_with_options(&provider, state.get_parser_options());
2088 query.sql_to_expr(sql_expr, &df_schema, &mut PlannerContext::new())
2089 }
2090
2091 let state = SessionStateBuilder::new().with_default_features().build();
2092
2093 assert!(sql_to_expr(&state).is_ok());
2094
2095 let state = SessionStateBuilder::new().build();
2097
2098 assert!(sql_to_expr(&state).is_err())
2099 }
2100
2101 #[test]
2102 fn test_from_existing() -> Result<()> {
2103 fn employee_batch() -> RecordBatch {
2104 let name: ArrayRef =
2105 Arc::new(StringArray::from_iter_values(["Andy", "Andrew"]));
2106 let age: ArrayRef = Arc::new(Int32Array::from(vec![11, 22]));
2107 RecordBatch::try_from_iter(vec![("name", name), ("age", age)]).unwrap()
2108 }
2109 let batch = employee_batch();
2110 let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
2111
2112 let session_state = SessionStateBuilder::new()
2113 .with_catalog_list(Arc::new(MemoryCatalogProviderList::new()))
2114 .build();
2115 let table_ref = session_state.resolve_table_ref("employee").to_string();
2116 session_state
2117 .schema_for_ref(&table_ref)?
2118 .register_table("employee".to_string(), Arc::new(table))?;
2119
2120 let default_catalog = session_state
2121 .config
2122 .options()
2123 .catalog
2124 .default_catalog
2125 .clone();
2126 let default_schema = session_state
2127 .config
2128 .options()
2129 .catalog
2130 .default_schema
2131 .clone();
2132 let is_exist = session_state
2133 .catalog_list()
2134 .catalog(default_catalog.as_str())
2135 .unwrap()
2136 .schema(default_schema.as_str())
2137 .unwrap()
2138 .table_exist("employee");
2139 assert!(is_exist);
2140 let new_state = SessionStateBuilder::new_from_existing(session_state).build();
2141 assert!(new_state
2142 .catalog_list()
2143 .catalog(default_catalog.as_str())
2144 .unwrap()
2145 .schema(default_schema.as_str())
2146 .unwrap()
2147 .table_exist("employee"));
2148
2149 let disable_create_default =
2151 SessionConfig::default().with_create_default_catalog_and_schema(false);
2152 let without_default_state = SessionStateBuilder::new()
2153 .with_config(disable_create_default)
2154 .build();
2155 assert!(without_default_state
2156 .catalog_list()
2157 .catalog(&default_catalog)
2158 .is_none());
2159 let new_state =
2160 SessionStateBuilder::new_from_existing(without_default_state).build();
2161 assert!(new_state.catalog_list().catalog(&default_catalog).is_none());
2162 Ok(())
2163 }
2164
2165 #[test]
2166 fn test_session_state_with_optimizer_rules() {
2167 #[derive(Default, Debug)]
2168 struct DummyRule {}
2169
2170 impl OptimizerRule for DummyRule {
2171 fn name(&self) -> &str {
2172 "dummy_rule"
2173 }
2174 }
2175 let state = SessionStateBuilder::new()
2177 .with_optimizer_rules(vec![Arc::new(DummyRule {})])
2178 .build();
2179
2180 assert_eq!(state.optimizers().len(), 1);
2181
2182 let state = SessionStateBuilder::new()
2184 .with_optimizer_rule(Arc::new(DummyRule {}))
2185 .build();
2186
2187 assert_eq!(
2188 state.optimizers().len(),
2189 Optimizer::default().rules.len() + 1
2190 );
2191 }
2192
2193 #[test]
2194 fn test_with_table_factories() -> Result<()> {
2195 use crate::test_util::TestTableFactory;
2196
2197 let state = SessionStateBuilder::new().build();
2198 let table_factories = state.table_factories();
2199 assert!(table_factories.is_empty());
2200
2201 let table_factory = Arc::new(TestTableFactory {});
2202 let state = SessionStateBuilder::new()
2203 .with_table_factory("employee".to_string(), table_factory)
2204 .build();
2205 let table_factories = state.table_factories();
2206 assert_eq!(table_factories.len(), 1);
2207 assert!(table_factories.contains_key("employee"));
2208 Ok(())
2209 }
2210
2211 #[test]
2212 fn test_with_default_features_not_override() -> Result<()> {
2213 use crate::test_util::TestTableFactory;
2214
2215 let table_factory = Arc::new(TestTableFactory {});
2217 let session_state = SessionStateBuilder::new()
2218 .with_table_factory("test".to_string(), table_factory)
2219 .with_default_features()
2220 .build();
2221 assert!(session_state.table_factories().get("test").is_some());
2222
2223 Ok(())
2224 }
2225
2226 #[tokio::test]
2229 async fn test_with_expr_planners() -> Result<()> {
2230 async fn plan_count_wildcard(
2232 with_expr_planners: bool,
2233 ) -> Result<Arc<dyn ExecutionPlan>> {
2234 let mut context_provider = MyContextProvider::new().with_table(
2235 "t",
2236 provider_as_source(Arc::new(EmptyTable::new(Schema::empty().into()))),
2237 );
2238 if with_expr_planners {
2239 context_provider = context_provider.with_expr_planners();
2240 }
2241
2242 let state = &context_provider.state;
2243 let statement =
2244 state.sql_to_statement("select count(*) from t", &Dialect::MySQL)?;
2245 let plan = SqlToRel::new(&context_provider).statement_to_plan(statement)?;
2246 state.create_physical_plan(&plan).await
2247 }
2248
2249 let got = plan_count_wildcard(false).await;
2251 assert_contains!(
2252 got.unwrap_err().to_string(),
2253 "Physical plan does not support logical expression Wildcard"
2254 );
2255
2256 let got = plan_count_wildcard(true).await?;
2258 let displayable = DisplayableExecutionPlan::new(got.as_ref());
2259 assert_eq!(
2260 displayable.indent(false).to_string(),
2261 "ProjectionExec: expr=[0 as count(*)]\n PlaceholderRowExec\n"
2262 );
2263
2264 Ok(())
2265 }
2266
2267 struct MyContextProvider {
2271 state: SessionState,
2273 tables: HashMap<ResolvedTableReference, Arc<dyn TableSource>>,
2275 return_expr_planners: bool,
2277 }
2278
2279 impl MyContextProvider {
2280 pub fn new() -> Self {
2282 Self {
2283 state: SessionStateBuilder::default()
2284 .with_default_features()
2285 .build(),
2286 tables: HashMap::new(),
2287 return_expr_planners: false,
2288 }
2289 }
2290
2291 pub fn with_table(mut self, table: &str, source: Arc<dyn TableSource>) -> Self {
2295 self.tables.insert(
2296 ResolvedTableReference {
2297 catalog: "default".to_string().into(),
2298 schema: "public".to_string().into(),
2299 table: table.to_string().into(),
2300 },
2301 source,
2302 );
2303 self
2304 }
2305
2306 pub fn with_expr_planners(self) -> Self {
2308 Self {
2309 return_expr_planners: true,
2310 ..self
2311 }
2312 }
2313 }
2314
2315 impl ContextProvider for MyContextProvider {
2316 fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
2317 let resolved_table_ref = ResolvedTableReference {
2318 catalog: "default".to_string().into(),
2319 schema: "public".to_string().into(),
2320 table: name.table().to_string().into(),
2321 };
2322 let source = self.tables.get(&resolved_table_ref).cloned().unwrap();
2323 Ok(source)
2324 }
2325
2326 fn get_expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
2331 if self.return_expr_planners {
2332 self.state.expr_planners()
2333 } else {
2334 &[]
2335 }
2336 }
2337
2338 fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
2339 self.state.scalar_functions().get(name).cloned()
2340 }
2341
2342 fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
2343 self.state.aggregate_functions().get(name).cloned()
2344 }
2345
2346 fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
2347 self.state.window_functions().get(name).cloned()
2348 }
2349
2350 fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
2351 None
2352 }
2353
2354 fn options(&self) -> &ConfigOptions {
2355 self.state.config_options()
2356 }
2357
2358 fn udf_names(&self) -> Vec<String> {
2359 self.state.scalar_functions().keys().cloned().collect()
2360 }
2361
2362 fn udaf_names(&self) -> Vec<String> {
2363 self.state.aggregate_functions().keys().cloned().collect()
2364 }
2365
2366 fn udwf_names(&self) -> Vec<String> {
2367 self.state.window_functions().keys().cloned().collect()
2368 }
2369 }
2370}