datafusion/execution/
session_state.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SessionState`]: information required to run queries in a session
19
20use std::any::Any;
21use std::collections::hash_map::Entry;
22use std::collections::{HashMap, HashSet};
23use std::fmt::Debug;
24use std::sync::Arc;
25
26use crate::catalog::{CatalogProviderList, SchemaProvider, TableProviderFactory};
27use crate::datasource::file_format::FileFormatFactory;
28#[cfg(feature = "sql")]
29use crate::datasource::provider_as_source;
30use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner};
31use crate::execution::SessionStateDefaults;
32use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
33use arrow_schema::{DataType, FieldRef};
34use datafusion_catalog::information_schema::{
35    InformationSchemaProvider, INFORMATION_SCHEMA,
36};
37use datafusion_catalog::MemoryCatalogProviderList;
38use datafusion_catalog::{TableFunction, TableFunctionImpl};
39use datafusion_common::alias::AliasGenerator;
40#[cfg(feature = "sql")]
41use datafusion_common::config::Dialect;
42use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions};
43use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
44use datafusion_common::{
45    config_err, exec_err, plan_datafusion_err, DFSchema, DataFusionError,
46    ResolvedTableReference, TableReference,
47};
48use datafusion_execution::config::SessionConfig;
49use datafusion_execution::runtime_env::RuntimeEnv;
50use datafusion_execution::TaskContext;
51use datafusion_expr::execution_props::ExecutionProps;
52use datafusion_expr::expr_rewriter::FunctionRewrite;
53use datafusion_expr::planner::ExprPlanner;
54#[cfg(feature = "sql")]
55use datafusion_expr::planner::TypePlanner;
56use datafusion_expr::registry::{FunctionRegistry, SerializerRegistry};
57use datafusion_expr::simplify::SimplifyInfo;
58#[cfg(feature = "sql")]
59use datafusion_expr::TableSource;
60use datafusion_expr::{
61    AggregateUDF, Explain, Expr, ExprSchemable, LogicalPlan, ScalarUDF, WindowUDF,
62};
63use datafusion_optimizer::simplify_expressions::ExprSimplifier;
64use datafusion_optimizer::{
65    Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule,
66};
67use datafusion_physical_expr::create_physical_expr;
68use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
69use datafusion_physical_optimizer::optimizer::PhysicalOptimizer;
70use datafusion_physical_optimizer::PhysicalOptimizerRule;
71use datafusion_physical_plan::ExecutionPlan;
72use datafusion_session::Session;
73#[cfg(feature = "sql")]
74use datafusion_sql::{
75    parser::{DFParserBuilder, Statement},
76    planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel},
77};
78
79use async_trait::async_trait;
80use chrono::{DateTime, Utc};
81use itertools::Itertools;
82use log::{debug, info};
83use object_store::ObjectStore;
84#[cfg(feature = "sql")]
85use sqlparser::{
86    ast::{Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias},
87    dialect::dialect_from_str,
88};
89use url::Url;
90use uuid::Uuid;
91
92/// `SessionState` contains all the necessary state to plan and execute queries,
93/// such as configuration, functions, and runtime environment. Please see the
94/// documentation on [`SessionContext`] for more information.
95///
96///
97/// # Example: `SessionState` from a [`SessionContext`]
98///
99/// ```
100/// use datafusion::prelude::*;
101/// let ctx = SessionContext::new();
102/// let state = ctx.state();
103/// ```
104///
105/// # Example: `SessionState` via [`SessionStateBuilder`]
106///
107/// You can also use [`SessionStateBuilder`] to build a `SessionState` object
108/// directly:
109///
110/// ```
111/// use datafusion::prelude::*;
112/// # use datafusion::{error::Result, assert_batches_eq};
113/// # use datafusion::execution::session_state::SessionStateBuilder;
114/// # use datafusion_execution::runtime_env::RuntimeEnv;
115/// # use std::sync::Arc;
116/// # #[tokio::main]
117/// # async fn main() -> Result<()> {
118/// let state = SessionStateBuilder::new()
119///     .with_config(SessionConfig::new())
120///     .with_runtime_env(Arc::new(RuntimeEnv::default()))
121///     .with_default_features()
122///     .build();
123/// Ok(())
124/// # }
125/// ```
126///
127/// Note that there is no `Default` or `new()` for SessionState,
128/// to avoid accidentally running queries or other operations without passing through
129/// the [`SessionConfig`] or [`RuntimeEnv`]. See [`SessionStateBuilder`] and
130/// [`SessionContext`].
131///
132/// [`SessionContext`]: crate::execution::context::SessionContext
133#[derive(Clone)]
134pub struct SessionState {
135    /// A unique UUID that identifies the session
136    session_id: String,
137    /// Responsible for analyzing and rewrite a logical plan before optimization
138    analyzer: Analyzer,
139    /// Provides support for customizing the SQL planner, e.g. to add support for custom operators like `->>` or `?`
140    expr_planners: Vec<Arc<dyn ExprPlanner>>,
141    /// Provides support for customizing the SQL type planning
142    #[cfg(feature = "sql")]
143    type_planner: Option<Arc<dyn TypePlanner>>,
144    /// Responsible for optimizing a logical plan
145    optimizer: Optimizer,
146    /// Responsible for optimizing a physical execution plan
147    physical_optimizers: PhysicalOptimizer,
148    /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
149    query_planner: Arc<dyn QueryPlanner + Send + Sync>,
150    /// Collection of catalogs containing schemas and ultimately TableProviders
151    catalog_list: Arc<dyn CatalogProviderList>,
152    /// Table Functions
153    table_functions: HashMap<String, Arc<TableFunction>>,
154    /// Scalar functions that are registered with the context
155    scalar_functions: HashMap<String, Arc<ScalarUDF>>,
156    /// Aggregate functions registered in the context
157    aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
158    /// Window functions registered in the context
159    window_functions: HashMap<String, Arc<WindowUDF>>,
160    /// Deserializer registry for extensions.
161    serializer_registry: Arc<dyn SerializerRegistry>,
162    /// Holds registered external FileFormat implementations
163    file_formats: HashMap<String, Arc<dyn FileFormatFactory>>,
164    /// Session configuration
165    config: SessionConfig,
166    /// Table options
167    table_options: TableOptions,
168    /// Execution properties
169    execution_props: ExecutionProps,
170    /// TableProviderFactories for different file formats.
171    ///
172    /// Maps strings like "JSON" to an instance of  [`TableProviderFactory`]
173    ///
174    /// This is used to create [`TableProvider`] instances for the
175    /// `CREATE EXTERNAL TABLE ... STORED AS <FORMAT>` for custom file
176    /// formats other than those built into DataFusion
177    ///
178    /// [`TableProvider`]: crate::catalog::TableProvider
179    table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
180    /// Runtime environment
181    runtime_env: Arc<RuntimeEnv>,
182    /// [FunctionFactory] to support pluggable user defined function handler.
183    ///
184    /// It will be invoked on `CREATE FUNCTION` statements.
185    /// thus, changing dialect o PostgreSql is required
186    function_factory: Option<Arc<dyn FunctionFactory>>,
187    /// Cache logical plans of prepared statements for later execution.
188    /// Key is the prepared statement name.
189    prepared_plans: HashMap<String, Arc<PreparedPlan>>,
190}
191
192impl Debug for SessionState {
193    /// Prefer having short fields at the top and long vector fields near the end
194    /// Group fields by
195    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196        let mut debug_struct = f.debug_struct("SessionState");
197        let ret = debug_struct
198            .field("session_id", &self.session_id)
199            .field("config", &self.config)
200            .field("runtime_env", &self.runtime_env)
201            .field("catalog_list", &self.catalog_list)
202            .field("serializer_registry", &self.serializer_registry)
203            .field("file_formats", &self.file_formats)
204            .field("execution_props", &self.execution_props)
205            .field("table_options", &self.table_options)
206            .field("table_factories", &self.table_factories)
207            .field("function_factory", &self.function_factory)
208            .field("expr_planners", &self.expr_planners);
209
210        #[cfg(feature = "sql")]
211        let ret = ret.field("type_planner", &self.type_planner);
212
213        ret.field("query_planners", &self.query_planner)
214            .field("analyzer", &self.analyzer)
215            .field("optimizer", &self.optimizer)
216            .field("physical_optimizers", &self.physical_optimizers)
217            .field("table_functions", &self.table_functions)
218            .field("scalar_functions", &self.scalar_functions)
219            .field("aggregate_functions", &self.aggregate_functions)
220            .field("window_functions", &self.window_functions)
221            .field("prepared_plans", &self.prepared_plans)
222            .finish()
223    }
224}
225
226#[async_trait]
227impl Session for SessionState {
228    fn session_id(&self) -> &str {
229        self.session_id()
230    }
231
232    fn config(&self) -> &SessionConfig {
233        self.config()
234    }
235
236    async fn create_physical_plan(
237        &self,
238        logical_plan: &LogicalPlan,
239    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
240        self.create_physical_plan(logical_plan).await
241    }
242
243    fn create_physical_expr(
244        &self,
245        expr: Expr,
246        df_schema: &DFSchema,
247    ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
248        self.create_physical_expr(expr, df_schema)
249    }
250
251    fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
252        &self.scalar_functions
253    }
254
255    fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
256        &self.aggregate_functions
257    }
258
259    fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
260        &self.window_functions
261    }
262
263    fn runtime_env(&self) -> &Arc<RuntimeEnv> {
264        self.runtime_env()
265    }
266
267    fn execution_props(&self) -> &ExecutionProps {
268        self.execution_props()
269    }
270
271    fn as_any(&self) -> &dyn Any {
272        self
273    }
274
275    fn table_options(&self) -> &TableOptions {
276        self.table_options()
277    }
278
279    fn table_options_mut(&mut self) -> &mut TableOptions {
280        self.table_options_mut()
281    }
282
283    fn task_ctx(&self) -> Arc<TaskContext> {
284        self.task_ctx()
285    }
286}
287
288impl SessionState {
289    pub(crate) fn resolve_table_ref(
290        &self,
291        table_ref: impl Into<TableReference>,
292    ) -> ResolvedTableReference {
293        let catalog = &self.config_options().catalog;
294        table_ref
295            .into()
296            .resolve(&catalog.default_catalog, &catalog.default_schema)
297    }
298
299    /// Retrieve the [`SchemaProvider`] for a specific [`TableReference`], if it
300    /// exists.
301    pub fn schema_for_ref(
302        &self,
303        table_ref: impl Into<TableReference>,
304    ) -> datafusion_common::Result<Arc<dyn SchemaProvider>> {
305        let resolved_ref = self.resolve_table_ref(table_ref);
306        if self.config.information_schema() && *resolved_ref.schema == *INFORMATION_SCHEMA
307        {
308            return Ok(Arc::new(InformationSchemaProvider::new(Arc::clone(
309                &self.catalog_list,
310            ))));
311        }
312
313        self.catalog_list
314            .catalog(&resolved_ref.catalog)
315            .ok_or_else(|| {
316                plan_datafusion_err!(
317                    "failed to resolve catalog: {}",
318                    resolved_ref.catalog
319                )
320            })?
321            .schema(&resolved_ref.schema)
322            .ok_or_else(|| {
323                plan_datafusion_err!("failed to resolve schema: {}", resolved_ref.schema)
324            })
325    }
326
327    /// Add `analyzer_rule` to the end of the list of
328    /// [`AnalyzerRule`]s used to rewrite queries.
329    pub fn add_analyzer_rule(
330        &mut self,
331        analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>,
332    ) -> &Self {
333        self.analyzer.rules.push(analyzer_rule);
334        self
335    }
336
337    // the add_optimizer_rule takes an owned reference
338    // it should probably be renamed to `with_optimizer_rule` to follow builder style
339    // and `add_optimizer_rule` that takes &mut self added instead of this
340    pub(crate) fn append_optimizer_rule(
341        &mut self,
342        optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
343    ) {
344        self.optimizer.rules.push(optimizer_rule);
345    }
346
347    /// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION` statements
348    pub fn set_function_factory(&mut self, function_factory: Arc<dyn FunctionFactory>) {
349        self.function_factory = Some(function_factory);
350    }
351
352    /// Get the function factory
353    pub fn function_factory(&self) -> Option<&Arc<dyn FunctionFactory>> {
354        self.function_factory.as_ref()
355    }
356
357    /// Get the table factories
358    pub fn table_factories(&self) -> &HashMap<String, Arc<dyn TableProviderFactory>> {
359        &self.table_factories
360    }
361
362    /// Get the table factories
363    pub fn table_factories_mut(
364        &mut self,
365    ) -> &mut HashMap<String, Arc<dyn TableProviderFactory>> {
366        &mut self.table_factories
367    }
368
369    /// Parse an SQL string into an DataFusion specific AST
370    /// [`Statement`]. See [`SessionContext::sql`] for running queries.
371    ///
372    /// [`SessionContext::sql`]: crate::execution::context::SessionContext::sql
373    #[cfg(feature = "sql")]
374    pub fn sql_to_statement(
375        &self,
376        sql: &str,
377        dialect: &Dialect,
378    ) -> datafusion_common::Result<Statement> {
379        let dialect = dialect_from_str(dialect).ok_or_else(|| {
380            plan_datafusion_err!(
381                "Unsupported SQL dialect: {dialect}. Available dialects: \
382                     Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
383                     MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks."
384            )
385        })?;
386
387        let recursion_limit = self.config.options().sql_parser.recursion_limit;
388
389        let mut statements = DFParserBuilder::new(sql)
390            .with_dialect(dialect.as_ref())
391            .with_recursion_limit(recursion_limit)
392            .build()?
393            .parse_statements()?;
394
395        if statements.len() > 1 {
396            return datafusion_common::not_impl_err!(
397                "The context currently only supports a single SQL statement"
398            );
399        }
400
401        let statement = statements.pop_front().ok_or_else(|| {
402            plan_datafusion_err!("No SQL statements were provided in the query string")
403        })?;
404        Ok(statement)
405    }
406
407    /// parse a sql string into a sqlparser-rs AST [`SQLExpr`].
408    ///
409    /// See [`Self::create_logical_expr`] for parsing sql to [`Expr`].
410    #[cfg(feature = "sql")]
411    pub fn sql_to_expr(
412        &self,
413        sql: &str,
414        dialect: &Dialect,
415    ) -> datafusion_common::Result<SQLExpr> {
416        self.sql_to_expr_with_alias(sql, dialect).map(|x| x.expr)
417    }
418
419    /// parse a sql string into a sqlparser-rs AST [`SQLExprWithAlias`].
420    ///
421    /// See [`Self::create_logical_expr`] for parsing sql to [`Expr`].
422    #[cfg(feature = "sql")]
423    pub fn sql_to_expr_with_alias(
424        &self,
425        sql: &str,
426        dialect: &Dialect,
427    ) -> datafusion_common::Result<SQLExprWithAlias> {
428        let dialect = dialect_from_str(dialect).ok_or_else(|| {
429            plan_datafusion_err!(
430                "Unsupported SQL dialect: {dialect}. Available dialects: \
431                         Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
432                         MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks."
433            )
434        })?;
435
436        let recursion_limit = self.config.options().sql_parser.recursion_limit;
437        let expr = DFParserBuilder::new(sql)
438            .with_dialect(dialect.as_ref())
439            .with_recursion_limit(recursion_limit)
440            .build()?
441            .parse_into_expr()?;
442
443        Ok(expr)
444    }
445
446    /// Resolve all table references in the SQL statement. Does not include CTE references.
447    ///
448    /// See [`datafusion_sql::resolve::resolve_table_references`] for more information.
449    ///
450    /// [`datafusion_sql::resolve::resolve_table_references`]: datafusion_sql::resolve::resolve_table_references
451    #[cfg(feature = "sql")]
452    pub fn resolve_table_references(
453        &self,
454        statement: &Statement,
455    ) -> datafusion_common::Result<Vec<TableReference>> {
456        let enable_ident_normalization =
457            self.config.options().sql_parser.enable_ident_normalization;
458        let (table_refs, _) = datafusion_sql::resolve::resolve_table_references(
459            statement,
460            enable_ident_normalization,
461        )?;
462        Ok(table_refs)
463    }
464
465    /// Convert an AST Statement into a LogicalPlan
466    #[cfg(feature = "sql")]
467    pub async fn statement_to_plan(
468        &self,
469        statement: Statement,
470    ) -> datafusion_common::Result<LogicalPlan> {
471        let references = self.resolve_table_references(&statement)?;
472
473        let mut provider = SessionContextProvider {
474            state: self,
475            tables: HashMap::with_capacity(references.len()),
476        };
477
478        for reference in references {
479            let resolved = self.resolve_table_ref(reference);
480            if let Entry::Vacant(v) = provider.tables.entry(resolved) {
481                let resolved = v.key();
482                if let Ok(schema) = self.schema_for_ref(resolved.clone()) {
483                    if let Some(table) = schema.table(&resolved.table).await? {
484                        v.insert(provider_as_source(table));
485                    }
486                }
487            }
488        }
489
490        let query = SqlToRel::new_with_options(&provider, self.get_parser_options());
491        query.statement_to_plan(statement)
492    }
493
494    #[cfg(feature = "sql")]
495    fn get_parser_options(&self) -> ParserOptions {
496        let sql_parser_options = &self.config.options().sql_parser;
497
498        ParserOptions {
499            parse_float_as_decimal: sql_parser_options.parse_float_as_decimal,
500            enable_ident_normalization: sql_parser_options.enable_ident_normalization,
501            enable_options_value_normalization: sql_parser_options
502                .enable_options_value_normalization,
503            support_varchar_with_length: sql_parser_options.support_varchar_with_length,
504            map_string_types_to_utf8view: sql_parser_options.map_string_types_to_utf8view,
505            collect_spans: sql_parser_options.collect_spans,
506            default_null_ordering: sql_parser_options
507                .default_null_ordering
508                .as_str()
509                .into(),
510        }
511    }
512
513    /// Creates a [`LogicalPlan`] from the provided SQL string. This
514    /// interface will plan any SQL DataFusion supports, including DML
515    /// like `CREATE TABLE`, and `COPY` (which can write to local
516    /// files.
517    ///
518    /// See [`SessionContext::sql`] and
519    /// [`SessionContext::sql_with_options`] for a higher-level
520    /// interface that handles DDL and verification of allowed
521    /// statements.
522    ///
523    /// [`SessionContext::sql`]: crate::execution::context::SessionContext::sql
524    /// [`SessionContext::sql_with_options`]: crate::execution::context::SessionContext::sql_with_options
525    #[cfg(feature = "sql")]
526    pub async fn create_logical_plan(
527        &self,
528        sql: &str,
529    ) -> datafusion_common::Result<LogicalPlan> {
530        let dialect = self.config.options().sql_parser.dialect;
531        let statement = self.sql_to_statement(sql, &dialect)?;
532        let plan = self.statement_to_plan(statement).await?;
533        Ok(plan)
534    }
535
536    /// Creates a datafusion style AST [`Expr`] from a SQL string.
537    ///
538    /// See example on  [SessionContext::parse_sql_expr](crate::execution::context::SessionContext::parse_sql_expr)
539    #[cfg(feature = "sql")]
540    pub fn create_logical_expr(
541        &self,
542        sql: &str,
543        df_schema: &DFSchema,
544    ) -> datafusion_common::Result<Expr> {
545        let dialect = self.config.options().sql_parser.dialect;
546
547        let sql_expr = self.sql_to_expr_with_alias(sql, &dialect)?;
548
549        let provider = SessionContextProvider {
550            state: self,
551            tables: HashMap::new(),
552        };
553
554        let query = SqlToRel::new_with_options(&provider, self.get_parser_options());
555        query.sql_to_expr_with_alias(sql_expr, df_schema, &mut PlannerContext::new())
556    }
557
558    /// Returns the [`Analyzer`] for this session
559    pub fn analyzer(&self) -> &Analyzer {
560        &self.analyzer
561    }
562
563    /// Returns the [`Optimizer`] for this session
564    pub fn optimizer(&self) -> &Optimizer {
565        &self.optimizer
566    }
567
568    /// Returns the [`ExprPlanner`]s for this session
569    pub fn expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
570        &self.expr_planners
571    }
572
573    /// Returns the [`QueryPlanner`] for this session
574    pub fn query_planner(&self) -> &Arc<dyn QueryPlanner + Send + Sync> {
575        &self.query_planner
576    }
577
578    /// Optimizes the logical plan by applying optimizer rules.
579    pub fn optimize(&self, plan: &LogicalPlan) -> datafusion_common::Result<LogicalPlan> {
580        if let LogicalPlan::Explain(e) = plan {
581            let mut stringified_plans = e.stringified_plans.clone();
582
583            // analyze & capture output of each rule
584            let analyzer_result = self.analyzer.execute_and_check(
585                e.plan.as_ref().clone(),
586                &self.options(),
587                |analyzed_plan, analyzer| {
588                    let analyzer_name = analyzer.name().to_string();
589                    let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
590                    stringified_plans.push(analyzed_plan.to_stringified(plan_type));
591                },
592            );
593            let analyzed_plan = match analyzer_result {
594                Ok(plan) => plan,
595                Err(DataFusionError::Context(analyzer_name, err)) => {
596                    let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
597                    stringified_plans
598                        .push(StringifiedPlan::new(plan_type, err.to_string()));
599
600                    return Ok(LogicalPlan::Explain(Explain {
601                        verbose: e.verbose,
602                        plan: Arc::clone(&e.plan),
603                        explain_format: e.explain_format.clone(),
604                        stringified_plans,
605                        schema: Arc::clone(&e.schema),
606                        logical_optimization_succeeded: false,
607                    }));
608                }
609                Err(e) => return Err(e),
610            };
611
612            // to delineate the analyzer & optimizer phases in explain output
613            stringified_plans
614                .push(analyzed_plan.to_stringified(PlanType::FinalAnalyzedLogicalPlan));
615
616            // optimize the child plan, capturing the output of each optimizer
617            let optimized_plan = self.optimizer.optimize(
618                analyzed_plan,
619                self,
620                |optimized_plan, optimizer| {
621                    let optimizer_name = optimizer.name().to_string();
622                    let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
623                    stringified_plans.push(optimized_plan.to_stringified(plan_type));
624                },
625            );
626            let (plan, logical_optimization_succeeded) = match optimized_plan {
627                Ok(plan) => (Arc::new(plan), true),
628                Err(DataFusionError::Context(optimizer_name, err)) => {
629                    let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
630                    stringified_plans
631                        .push(StringifiedPlan::new(plan_type, err.to_string()));
632                    (Arc::clone(&e.plan), false)
633                }
634                Err(e) => return Err(e),
635            };
636
637            Ok(LogicalPlan::Explain(Explain {
638                verbose: e.verbose,
639                explain_format: e.explain_format.clone(),
640                plan,
641                stringified_plans,
642                schema: Arc::clone(&e.schema),
643                logical_optimization_succeeded,
644            }))
645        } else {
646            let analyzed_plan = self.analyzer.execute_and_check(
647                plan.clone(),
648                &self.options(),
649                |_, _| {},
650            )?;
651            self.optimizer.optimize(analyzed_plan, self, |_, _| {})
652        }
653    }
654
655    /// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`].
656    ///
657    /// Note: this first calls [`Self::optimize`] on the provided
658    /// plan.
659    ///
660    /// This function will error for [`LogicalPlan`]s such as catalog DDL like
661    /// `CREATE TABLE`, which do not have corresponding physical plans and must
662    /// be handled by another layer, typically [`SessionContext`].
663    ///
664    /// [`SessionContext`]: crate::execution::context::SessionContext
665    pub async fn create_physical_plan(
666        &self,
667        logical_plan: &LogicalPlan,
668    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
669        let logical_plan = self.optimize(logical_plan)?;
670        self.query_planner
671            .create_physical_plan(&logical_plan, self)
672            .await
673    }
674
675    /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type
676    /// coercion, and function rewrites.
677    ///
678    /// Note: The expression is not [simplified] or otherwise optimized:
679    /// `a = 1 + 2` will not be simplified to `a = 3` as this is a more involved process.
680    /// See the [expr_api] example for how to simplify expressions.
681    ///
682    /// # See Also:
683    /// * [`SessionContext::create_physical_expr`] for a higher-level API
684    /// * [`create_physical_expr`] for a lower-level API
685    ///
686    /// [simplified]: datafusion_optimizer::simplify_expressions
687    /// [expr_api]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs
688    /// [`SessionContext::create_physical_expr`]: crate::execution::context::SessionContext::create_physical_expr
689    pub fn create_physical_expr(
690        &self,
691        expr: Expr,
692        df_schema: &DFSchema,
693    ) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
694        let simplifier =
695            ExprSimplifier::new(SessionSimplifyProvider::new(self, df_schema));
696        // apply type coercion here to ensure types match
697        let mut expr = simplifier.coerce(expr, df_schema)?;
698
699        // rewrite Exprs to functions if necessary
700        let config_options = self.config_options();
701        for rewrite in self.analyzer.function_rewrites() {
702            expr = expr
703                .transform_up_with_schema(df_schema, |expr, df_schema| {
704                    rewrite.rewrite(expr, df_schema, config_options)
705                })?
706                .data;
707        }
708        create_physical_expr(&expr, df_schema, self.execution_props())
709    }
710
711    /// Return the session ID
712    pub fn session_id(&self) -> &str {
713        &self.session_id
714    }
715
716    /// Return the runtime env
717    pub fn runtime_env(&self) -> &Arc<RuntimeEnv> {
718        &self.runtime_env
719    }
720
721    /// Return the execution properties
722    pub fn execution_props(&self) -> &ExecutionProps {
723        &self.execution_props
724    }
725
726    /// Return mutable execution properties
727    pub fn execution_props_mut(&mut self) -> &mut ExecutionProps {
728        &mut self.execution_props
729    }
730
731    /// Return the [`SessionConfig`]
732    pub fn config(&self) -> &SessionConfig {
733        &self.config
734    }
735
736    /// Return the mutable [`SessionConfig`].
737    pub fn config_mut(&mut self) -> &mut SessionConfig {
738        &mut self.config
739    }
740
741    /// Return the logical optimizers
742    pub fn optimizers(&self) -> &[Arc<dyn OptimizerRule + Send + Sync>] {
743        &self.optimizer.rules
744    }
745
746    /// Return the physical optimizers
747    pub fn physical_optimizers(&self) -> &[Arc<dyn PhysicalOptimizerRule + Send + Sync>] {
748        &self.physical_optimizers.rules
749    }
750
751    /// return the configuration options
752    pub fn config_options(&self) -> &Arc<ConfigOptions> {
753        self.config.options()
754    }
755
756    /// Mark the start of the execution
757    pub fn mark_start_execution(&mut self) {
758        let config = Arc::clone(self.config.options());
759        self.execution_props.mark_start_execution(config);
760    }
761
762    /// Return the table options
763    pub fn table_options(&self) -> &TableOptions {
764        &self.table_options
765    }
766
767    /// return the TableOptions options with its extensions
768    pub fn default_table_options(&self) -> TableOptions {
769        Session::default_table_options(self)
770    }
771
772    /// Returns a mutable reference to [`TableOptions`]
773    pub fn table_options_mut(&mut self) -> &mut TableOptions {
774        &mut self.table_options
775    }
776
777    /// Registers a [`ConfigExtension`] as a table option extension that can be
778    /// referenced from SQL statements executed against this context.
779    pub fn register_table_options_extension<T: ConfigExtension>(&mut self, extension: T) {
780        self.table_options.extensions.insert(extension)
781    }
782
783    /// Adds or updates a [FileFormatFactory] which can be used with COPY TO or
784    /// CREATE EXTERNAL TABLE statements for reading and writing files of custom
785    /// formats.
786    pub fn register_file_format(
787        &mut self,
788        file_format: Arc<dyn FileFormatFactory>,
789        overwrite: bool,
790    ) -> Result<(), DataFusionError> {
791        let ext = file_format.get_ext().to_lowercase();
792        match (self.file_formats.entry(ext.clone()), overwrite){
793            (Entry::Vacant(e), _) => {e.insert(file_format);},
794            (Entry::Occupied(mut e), true)  => {e.insert(file_format);},
795            (Entry::Occupied(_), false) => return config_err!("File type already registered for extension {ext}. Set overwrite to true to replace this extension."),
796        };
797        Ok(())
798    }
799
800    /// Retrieves a [FileFormatFactory] based on file extension which has been registered
801    /// via SessionContext::register_file_format. Extensions are not case sensitive.
802    pub fn get_file_format_factory(
803        &self,
804        ext: &str,
805    ) -> Option<Arc<dyn FileFormatFactory>> {
806        self.file_formats.get(&ext.to_lowercase()).cloned()
807    }
808
809    /// Get a new TaskContext to run in this session
810    pub fn task_ctx(&self) -> Arc<TaskContext> {
811        Arc::new(TaskContext::from(self))
812    }
813
814    /// Return catalog list
815    pub fn catalog_list(&self) -> &Arc<dyn CatalogProviderList> {
816        &self.catalog_list
817    }
818
819    /// set the catalog list
820    pub(crate) fn register_catalog_list(
821        &mut self,
822        catalog_list: Arc<dyn CatalogProviderList>,
823    ) {
824        self.catalog_list = catalog_list;
825    }
826
827    /// Return reference to scalar_functions
828    pub fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
829        &self.scalar_functions
830    }
831
832    /// Return reference to aggregate_functions
833    pub fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
834        &self.aggregate_functions
835    }
836
837    /// Return reference to window functions
838    pub fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
839        &self.window_functions
840    }
841
842    /// Return reference to table_functions
843    pub fn table_functions(&self) -> &HashMap<String, Arc<TableFunction>> {
844        &self.table_functions
845    }
846
847    /// Return [SerializerRegistry] for extensions
848    pub fn serializer_registry(&self) -> &Arc<dyn SerializerRegistry> {
849        &self.serializer_registry
850    }
851
852    /// Return version of the cargo package that produced this query
853    pub fn version(&self) -> &str {
854        env!("CARGO_PKG_VERSION")
855    }
856
857    /// Register a user defined table function
858    pub fn register_udtf(&mut self, name: &str, fun: Arc<dyn TableFunctionImpl>) {
859        self.table_functions.insert(
860            name.to_owned(),
861            Arc::new(TableFunction::new(name.to_owned(), fun)),
862        );
863    }
864
865    /// Deregister a user defined table function
866    pub fn deregister_udtf(
867        &mut self,
868        name: &str,
869    ) -> datafusion_common::Result<Option<Arc<dyn TableFunctionImpl>>> {
870        let udtf = self.table_functions.remove(name);
871        Ok(udtf.map(|x| Arc::clone(x.function())))
872    }
873
874    /// Store the logical plan and the parameter types of a prepared statement.
875    pub(crate) fn store_prepared(
876        &mut self,
877        name: String,
878        fields: Vec<FieldRef>,
879        plan: Arc<LogicalPlan>,
880    ) -> datafusion_common::Result<()> {
881        match self.prepared_plans.entry(name) {
882            Entry::Vacant(e) => {
883                e.insert(Arc::new(PreparedPlan { fields, plan }));
884                Ok(())
885            }
886            Entry::Occupied(e) => {
887                exec_err!("Prepared statement '{}' already exists", e.key())
888            }
889        }
890    }
891
892    /// Get the prepared plan with the given name.
893    pub(crate) fn get_prepared(&self, name: &str) -> Option<Arc<PreparedPlan>> {
894        self.prepared_plans.get(name).map(Arc::clone)
895    }
896
897    /// Remove the prepared plan with the given name.
898    pub(crate) fn remove_prepared(
899        &mut self,
900        name: &str,
901    ) -> datafusion_common::Result<()> {
902        match self.prepared_plans.remove(name) {
903            Some(_) => Ok(()),
904            None => exec_err!("Prepared statement '{}' does not exist", name),
905        }
906    }
907}
908
909/// A builder to be used for building [`SessionState`]'s. Defaults will
910/// be used for all values unless explicitly provided.
911///
912/// See example on [`SessionState`]
913pub struct SessionStateBuilder {
914    session_id: Option<String>,
915    analyzer: Option<Analyzer>,
916    expr_planners: Option<Vec<Arc<dyn ExprPlanner>>>,
917    #[cfg(feature = "sql")]
918    type_planner: Option<Arc<dyn TypePlanner>>,
919    optimizer: Option<Optimizer>,
920    physical_optimizers: Option<PhysicalOptimizer>,
921    query_planner: Option<Arc<dyn QueryPlanner + Send + Sync>>,
922    catalog_list: Option<Arc<dyn CatalogProviderList>>,
923    table_functions: Option<HashMap<String, Arc<TableFunction>>>,
924    scalar_functions: Option<Vec<Arc<ScalarUDF>>>,
925    aggregate_functions: Option<Vec<Arc<AggregateUDF>>>,
926    window_functions: Option<Vec<Arc<WindowUDF>>>,
927    serializer_registry: Option<Arc<dyn SerializerRegistry>>,
928    file_formats: Option<Vec<Arc<dyn FileFormatFactory>>>,
929    config: Option<SessionConfig>,
930    table_options: Option<TableOptions>,
931    execution_props: Option<ExecutionProps>,
932    table_factories: Option<HashMap<String, Arc<dyn TableProviderFactory>>>,
933    runtime_env: Option<Arc<RuntimeEnv>>,
934    function_factory: Option<Arc<dyn FunctionFactory>>,
935    // fields to support convenience functions
936    analyzer_rules: Option<Vec<Arc<dyn AnalyzerRule + Send + Sync>>>,
937    optimizer_rules: Option<Vec<Arc<dyn OptimizerRule + Send + Sync>>>,
938    physical_optimizer_rules: Option<Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>>,
939}
940
941impl SessionStateBuilder {
942    /// Returns a new empty [`SessionStateBuilder`].
943    ///
944    /// See [`Self::with_default_features`] to install the default set of functions,
945    /// catalogs, etc.
946    ///
947    /// To create a `SessionStateBuilder` with default features such as functions,
948    /// please see [`Self::new_with_default_features`].
949    pub fn new() -> Self {
950        Self {
951            session_id: None,
952            analyzer: None,
953            expr_planners: None,
954            #[cfg(feature = "sql")]
955            type_planner: None,
956            optimizer: None,
957            physical_optimizers: None,
958            query_planner: None,
959            catalog_list: None,
960            table_functions: None,
961            scalar_functions: None,
962            aggregate_functions: None,
963            window_functions: None,
964            serializer_registry: None,
965            file_formats: None,
966            table_options: None,
967            config: None,
968            execution_props: None,
969            table_factories: None,
970            runtime_env: None,
971            function_factory: None,
972            // fields to support convenience functions
973            analyzer_rules: None,
974            optimizer_rules: None,
975            physical_optimizer_rules: None,
976        }
977    }
978
979    /// Returns a new [SessionStateBuilder] based on an existing [SessionState].
980    ///
981    /// The session id for the new builder will be unset; all other fields will
982    /// be cloned from `existing`. If the default
983    /// catalog exists in existing session state, the new session state will not
984    /// create default catalog and schema.
985    pub fn new_from_existing(existing: SessionState) -> Self {
986        let default_catalog_exist = existing
987            .catalog_list()
988            .catalog(&existing.config.options().catalog.default_catalog)
989            .is_some();
990        // The new `with_create_default_catalog_and_schema` should be false if the default catalog exists
991        let create_default_catalog_and_schema = existing
992            .config
993            .options()
994            .catalog
995            .create_default_catalog_and_schema
996            && !default_catalog_exist;
997        let new_config = existing
998            .config
999            .with_create_default_catalog_and_schema(create_default_catalog_and_schema);
1000        Self {
1001            session_id: None,
1002            analyzer: Some(existing.analyzer),
1003            expr_planners: Some(existing.expr_planners),
1004            #[cfg(feature = "sql")]
1005            type_planner: existing.type_planner,
1006            optimizer: Some(existing.optimizer),
1007            physical_optimizers: Some(existing.physical_optimizers),
1008            query_planner: Some(existing.query_planner),
1009            catalog_list: Some(existing.catalog_list),
1010            table_functions: Some(existing.table_functions),
1011            scalar_functions: Some(existing.scalar_functions.into_values().collect_vec()),
1012            aggregate_functions: Some(
1013                existing.aggregate_functions.into_values().collect_vec(),
1014            ),
1015            window_functions: Some(existing.window_functions.into_values().collect_vec()),
1016            serializer_registry: Some(existing.serializer_registry),
1017            file_formats: Some(existing.file_formats.into_values().collect_vec()),
1018            config: Some(new_config),
1019            table_options: Some(existing.table_options),
1020            execution_props: Some(existing.execution_props),
1021            table_factories: Some(existing.table_factories),
1022            runtime_env: Some(existing.runtime_env),
1023            function_factory: existing.function_factory,
1024
1025            // fields to support convenience functions
1026            analyzer_rules: None,
1027            optimizer_rules: None,
1028            physical_optimizer_rules: None,
1029        }
1030    }
1031
1032    /// Adds defaults for table_factories, file formats, expr_planners and builtin
1033    /// scalar, aggregate and windows functions.
1034    ///
1035    /// Note overwrites any previously registered items with the same name.
1036    pub fn with_default_features(mut self) -> Self {
1037        self.table_factories
1038            .get_or_insert_with(HashMap::new)
1039            .extend(SessionStateDefaults::default_table_factories());
1040
1041        self.file_formats
1042            .get_or_insert_with(Vec::new)
1043            .extend(SessionStateDefaults::default_file_formats());
1044
1045        self.expr_planners
1046            .get_or_insert_with(Vec::new)
1047            .extend(SessionStateDefaults::default_expr_planners());
1048
1049        self.scalar_functions
1050            .get_or_insert_with(Vec::new)
1051            .extend(SessionStateDefaults::default_scalar_functions());
1052
1053        self.aggregate_functions
1054            .get_or_insert_with(Vec::new)
1055            .extend(SessionStateDefaults::default_aggregate_functions());
1056
1057        self.window_functions
1058            .get_or_insert_with(Vec::new)
1059            .extend(SessionStateDefaults::default_window_functions());
1060
1061        self.table_functions
1062            .get_or_insert_with(HashMap::new)
1063            .extend(
1064                SessionStateDefaults::default_table_functions()
1065                    .into_iter()
1066                    .map(|f| (f.name().to_string(), f)),
1067            );
1068
1069        self
1070    }
1071
1072    /// Returns a new [`SessionStateBuilder`] with default features.
1073    ///
1074    /// This is equivalent to calling [`Self::new()`] followed by [`Self::with_default_features()`].
1075    ///
1076    /// ```
1077    /// use datafusion::execution::session_state::SessionStateBuilder;
1078    ///
1079    /// // Create a new SessionState with default features
1080    /// let session_state = SessionStateBuilder::new_with_default_features()
1081    ///     .with_session_id("my_session".to_string())
1082    ///     .build();
1083    /// ```
1084    pub fn new_with_default_features() -> Self {
1085        Self::new().with_default_features()
1086    }
1087
1088    /// Set the session id.
1089    pub fn with_session_id(mut self, session_id: String) -> Self {
1090        self.session_id = Some(session_id);
1091        self
1092    }
1093
1094    /// Set the [`AnalyzerRule`]s optimizer plan rules.
1095    pub fn with_analyzer_rules(
1096        mut self,
1097        rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
1098    ) -> Self {
1099        self.analyzer = Some(Analyzer::with_rules(rules));
1100        self
1101    }
1102
1103    /// Add `analyzer_rule` to the end of the list of
1104    /// [`AnalyzerRule`]s used to rewrite queries.
1105    pub fn with_analyzer_rule(
1106        mut self,
1107        analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>,
1108    ) -> Self {
1109        let mut rules = self.analyzer_rules.unwrap_or_default();
1110        rules.push(analyzer_rule);
1111        self.analyzer_rules = Some(rules);
1112        self
1113    }
1114
1115    /// Set the [`OptimizerRule`]s used to optimize plans.
1116    pub fn with_optimizer_rules(
1117        mut self,
1118        rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
1119    ) -> Self {
1120        self.optimizer = Some(Optimizer::with_rules(rules));
1121        self
1122    }
1123
1124    /// Add `optimizer_rule` to the end of the list of
1125    /// [`OptimizerRule`]s used to rewrite queries.
1126    pub fn with_optimizer_rule(
1127        mut self,
1128        optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
1129    ) -> Self {
1130        let mut rules = self.optimizer_rules.unwrap_or_default();
1131        rules.push(optimizer_rule);
1132        self.optimizer_rules = Some(rules);
1133        self
1134    }
1135
1136    /// Set the [`ExprPlanner`]s used to customize the behavior of the SQL planner.
1137    pub fn with_expr_planners(
1138        mut self,
1139        expr_planners: Vec<Arc<dyn ExprPlanner>>,
1140    ) -> Self {
1141        self.expr_planners = Some(expr_planners);
1142        self
1143    }
1144
1145    /// Set the [`TypePlanner`] used to customize the behavior of the SQL planner.
1146    #[cfg(feature = "sql")]
1147    pub fn with_type_planner(mut self, type_planner: Arc<dyn TypePlanner>) -> Self {
1148        self.type_planner = Some(type_planner);
1149        self
1150    }
1151
1152    /// Set the [`PhysicalOptimizerRule`]s used to optimize plans.
1153    pub fn with_physical_optimizer_rules(
1154        mut self,
1155        physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
1156    ) -> Self {
1157        self.physical_optimizers =
1158            Some(PhysicalOptimizer::with_rules(physical_optimizers));
1159        self
1160    }
1161
1162    /// Add `physical_optimizer_rule` to the end of the list of
1163    /// [`PhysicalOptimizerRule`]s used to rewrite queries.
1164    pub fn with_physical_optimizer_rule(
1165        mut self,
1166        physical_optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
1167    ) -> Self {
1168        let mut rules = self.physical_optimizer_rules.unwrap_or_default();
1169        rules.push(physical_optimizer_rule);
1170        self.physical_optimizer_rules = Some(rules);
1171        self
1172    }
1173
1174    /// Set the [`QueryPlanner`]
1175    pub fn with_query_planner(
1176        mut self,
1177        query_planner: Arc<dyn QueryPlanner + Send + Sync>,
1178    ) -> Self {
1179        self.query_planner = Some(query_planner);
1180        self
1181    }
1182
1183    /// Set the [`CatalogProviderList`]
1184    pub fn with_catalog_list(
1185        mut self,
1186        catalog_list: Arc<dyn CatalogProviderList>,
1187    ) -> Self {
1188        self.catalog_list = Some(catalog_list);
1189        self
1190    }
1191
1192    /// Set the map of [`TableFunction`]s
1193    pub fn with_table_functions(
1194        mut self,
1195        table_functions: HashMap<String, Arc<TableFunction>>,
1196    ) -> Self {
1197        self.table_functions = Some(table_functions);
1198        self
1199    }
1200
1201    /// Set the list of [`TableFunction`]s
1202    pub fn with_table_function_list(
1203        mut self,
1204        table_functions: Vec<Arc<TableFunction>>,
1205    ) -> Self {
1206        let functions = table_functions
1207            .into_iter()
1208            .map(|f| (f.name().to_string(), f))
1209            .collect();
1210        self.table_functions = Some(functions);
1211        self
1212    }
1213
1214    /// Set the map of [`ScalarUDF`]s
1215    pub fn with_scalar_functions(
1216        mut self,
1217        scalar_functions: Vec<Arc<ScalarUDF>>,
1218    ) -> Self {
1219        self.scalar_functions = Some(scalar_functions);
1220        self
1221    }
1222
1223    /// Set the map of [`AggregateUDF`]s
1224    pub fn with_aggregate_functions(
1225        mut self,
1226        aggregate_functions: Vec<Arc<AggregateUDF>>,
1227    ) -> Self {
1228        self.aggregate_functions = Some(aggregate_functions);
1229        self
1230    }
1231
1232    /// Set the map of [`WindowUDF`]s
1233    pub fn with_window_functions(
1234        mut self,
1235        window_functions: Vec<Arc<WindowUDF>>,
1236    ) -> Self {
1237        self.window_functions = Some(window_functions);
1238        self
1239    }
1240
1241    /// Set the [`SerializerRegistry`]
1242    pub fn with_serializer_registry(
1243        mut self,
1244        serializer_registry: Arc<dyn SerializerRegistry>,
1245    ) -> Self {
1246        self.serializer_registry = Some(serializer_registry);
1247        self
1248    }
1249
1250    /// Set the map of [`FileFormatFactory`]s
1251    pub fn with_file_formats(
1252        mut self,
1253        file_formats: Vec<Arc<dyn FileFormatFactory>>,
1254    ) -> Self {
1255        self.file_formats = Some(file_formats);
1256        self
1257    }
1258
1259    /// Set the [`SessionConfig`]
1260    pub fn with_config(mut self, config: SessionConfig) -> Self {
1261        self.config = Some(config);
1262        self
1263    }
1264
1265    /// Set the [`TableOptions`]
1266    pub fn with_table_options(mut self, table_options: TableOptions) -> Self {
1267        self.table_options = Some(table_options);
1268        self
1269    }
1270
1271    /// Set the [`ExecutionProps`]
1272    pub fn with_execution_props(mut self, execution_props: ExecutionProps) -> Self {
1273        self.execution_props = Some(execution_props);
1274        self
1275    }
1276
1277    /// Add a [`TableProviderFactory`] to the map of factories
1278    pub fn with_table_factory(
1279        mut self,
1280        key: String,
1281        table_factory: Arc<dyn TableProviderFactory>,
1282    ) -> Self {
1283        let mut table_factories = self.table_factories.unwrap_or_default();
1284        table_factories.insert(key, table_factory);
1285        self.table_factories = Some(table_factories);
1286        self
1287    }
1288
1289    /// Set the map of [`TableProviderFactory`]s
1290    pub fn with_table_factories(
1291        mut self,
1292        table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
1293    ) -> Self {
1294        self.table_factories = Some(table_factories);
1295        self
1296    }
1297
1298    /// Set the [`RuntimeEnv`]
1299    pub fn with_runtime_env(mut self, runtime_env: Arc<RuntimeEnv>) -> Self {
1300        self.runtime_env = Some(runtime_env);
1301        self
1302    }
1303
1304    /// Set a [`FunctionFactory`] to handle `CREATE FUNCTION` statements
1305    pub fn with_function_factory(
1306        mut self,
1307        function_factory: Option<Arc<dyn FunctionFactory>>,
1308    ) -> Self {
1309        self.function_factory = function_factory;
1310        self
1311    }
1312
1313    /// Register an `ObjectStore` to the [`RuntimeEnv`]. See [`RuntimeEnv::register_object_store`]
1314    /// for more details.
1315    ///
1316    /// Note that this creates a default [`RuntimeEnv`] if  there isn't one passed in already.
1317    ///
1318    /// ```
1319    /// # use datafusion::prelude::*;
1320    /// # use datafusion::execution::session_state::SessionStateBuilder;
1321    /// # use datafusion_execution::runtime_env::RuntimeEnv;
1322    /// # use url::Url;
1323    /// # use std::sync::Arc;
1324    /// # let http_store = object_store::local::LocalFileSystem::new();
1325    /// let url = Url::try_from("file://").unwrap();
1326    /// let object_store = object_store::local::LocalFileSystem::new();
1327    /// let state = SessionStateBuilder::new()
1328    ///     .with_config(SessionConfig::new())
1329    ///     .with_object_store(&url, Arc::new(object_store))
1330    ///     .with_default_features()
1331    ///     .build();
1332    /// ```
1333    pub fn with_object_store(
1334        mut self,
1335        url: &Url,
1336        object_store: Arc<dyn ObjectStore>,
1337    ) -> Self {
1338        if self.runtime_env.is_none() {
1339            self.runtime_env = Some(Arc::new(RuntimeEnv::default()));
1340        }
1341        self.runtime_env
1342            .as_ref()
1343            .unwrap()
1344            .register_object_store(url, object_store);
1345        self
1346    }
1347
1348    /// Builds a [`SessionState`] with the current configuration.
1349    ///
1350    /// Note that there is an explicit option for enabling catalog and schema defaults
1351    /// in [SessionConfig::create_default_catalog_and_schema] which if enabled
1352    /// will be built here.
1353    pub fn build(self) -> SessionState {
1354        let Self {
1355            session_id,
1356            analyzer,
1357            expr_planners,
1358            #[cfg(feature = "sql")]
1359            type_planner,
1360            optimizer,
1361            physical_optimizers,
1362            query_planner,
1363            catalog_list,
1364            table_functions,
1365            scalar_functions,
1366            aggregate_functions,
1367            window_functions,
1368            serializer_registry,
1369            file_formats,
1370            table_options,
1371            config,
1372            execution_props,
1373            table_factories,
1374            runtime_env,
1375            function_factory,
1376            analyzer_rules,
1377            optimizer_rules,
1378            physical_optimizer_rules,
1379        } = self;
1380
1381        let config = config.unwrap_or_default();
1382        let runtime_env = runtime_env.unwrap_or_else(|| Arc::new(RuntimeEnv::default()));
1383
1384        let mut state = SessionState {
1385            session_id: session_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
1386            analyzer: analyzer.unwrap_or_default(),
1387            expr_planners: expr_planners.unwrap_or_default(),
1388            #[cfg(feature = "sql")]
1389            type_planner,
1390            optimizer: optimizer.unwrap_or_default(),
1391            physical_optimizers: physical_optimizers.unwrap_or_default(),
1392            query_planner: query_planner
1393                .unwrap_or_else(|| Arc::new(DefaultQueryPlanner {})),
1394            catalog_list: catalog_list.unwrap_or_else(|| {
1395                Arc::new(MemoryCatalogProviderList::new()) as Arc<dyn CatalogProviderList>
1396            }),
1397            table_functions: table_functions.unwrap_or_default(),
1398            scalar_functions: HashMap::new(),
1399            aggregate_functions: HashMap::new(),
1400            window_functions: HashMap::new(),
1401            serializer_registry: serializer_registry
1402                .unwrap_or_else(|| Arc::new(EmptySerializerRegistry)),
1403            file_formats: HashMap::new(),
1404            table_options: table_options.unwrap_or_else(|| {
1405                TableOptions::default_from_session_config(config.options())
1406            }),
1407            config,
1408            execution_props: execution_props.unwrap_or_default(),
1409            table_factories: table_factories.unwrap_or_default(),
1410            runtime_env,
1411            function_factory,
1412            prepared_plans: HashMap::new(),
1413        };
1414
1415        if let Some(file_formats) = file_formats {
1416            for file_format in file_formats {
1417                if let Err(e) = state.register_file_format(file_format, false) {
1418                    info!("Unable to register file format: {e}")
1419                };
1420            }
1421        }
1422
1423        if let Some(scalar_functions) = scalar_functions {
1424            for udf in scalar_functions {
1425                let config_options = state.config().options();
1426                match udf.inner().with_updated_config(config_options) {
1427                    Some(new_udf) => {
1428                        if let Err(err) = state.register_udf(Arc::new(new_udf)) {
1429                            debug!(
1430                                "Failed to re-register updated UDF '{}': {}",
1431                                udf.name(),
1432                                err
1433                            );
1434                        }
1435                    }
1436                    None => match state.register_udf(Arc::clone(&udf)) {
1437                        Ok(Some(existing)) => {
1438                            debug!("Overwrote existing UDF '{}'", existing.name());
1439                        }
1440                        Ok(None) => {
1441                            debug!("Registered UDF '{}'", udf.name());
1442                        }
1443                        Err(err) => {
1444                            debug!("Failed to register UDF '{}': {}", udf.name(), err);
1445                        }
1446                    },
1447                }
1448            }
1449        }
1450
1451        if let Some(aggregate_functions) = aggregate_functions {
1452            aggregate_functions.into_iter().for_each(|udaf| {
1453                let existing_udf = state.register_udaf(udaf);
1454                if let Ok(Some(existing_udf)) = existing_udf {
1455                    debug!("Overwrote an existing UDF: {}", existing_udf.name());
1456                }
1457            });
1458        }
1459
1460        if let Some(window_functions) = window_functions {
1461            window_functions.into_iter().for_each(|udwf| {
1462                let existing_udf = state.register_udwf(udwf);
1463                if let Ok(Some(existing_udf)) = existing_udf {
1464                    debug!("Overwrote an existing UDF: {}", existing_udf.name());
1465                }
1466            });
1467        }
1468
1469        if state.config.create_default_catalog_and_schema() {
1470            let default_catalog = SessionStateDefaults::default_catalog(
1471                &state.config,
1472                &state.table_factories,
1473                &state.runtime_env,
1474            );
1475
1476            let existing_default_catalog = state.catalog_list.register_catalog(
1477                state.config.options().catalog.default_catalog.clone(),
1478                Arc::new(default_catalog),
1479            );
1480
1481            if existing_default_catalog.is_some() {
1482                debug!("Overwrote the default catalog");
1483            }
1484        }
1485
1486        if let Some(analyzer_rules) = analyzer_rules {
1487            for analyzer_rule in analyzer_rules {
1488                state.analyzer.rules.push(analyzer_rule);
1489            }
1490        }
1491
1492        if let Some(optimizer_rules) = optimizer_rules {
1493            for optimizer_rule in optimizer_rules {
1494                state.optimizer.rules.push(optimizer_rule);
1495            }
1496        }
1497
1498        if let Some(physical_optimizer_rules) = physical_optimizer_rules {
1499            for physical_optimizer_rule in physical_optimizer_rules {
1500                state
1501                    .physical_optimizers
1502                    .rules
1503                    .push(physical_optimizer_rule);
1504            }
1505        }
1506
1507        state
1508    }
1509
1510    /// Returns the current session_id value
1511    pub fn session_id(&self) -> &Option<String> {
1512        &self.session_id
1513    }
1514
1515    /// Returns the current analyzer value
1516    pub fn analyzer(&mut self) -> &mut Option<Analyzer> {
1517        &mut self.analyzer
1518    }
1519
1520    /// Returns the current expr_planners value
1521    pub fn expr_planners(&mut self) -> &mut Option<Vec<Arc<dyn ExprPlanner>>> {
1522        &mut self.expr_planners
1523    }
1524
1525    /// Returns the current type_planner value
1526    #[cfg(feature = "sql")]
1527    pub fn type_planner(&mut self) -> &mut Option<Arc<dyn TypePlanner>> {
1528        &mut self.type_planner
1529    }
1530
1531    /// Returns the current optimizer value
1532    pub fn optimizer(&mut self) -> &mut Option<Optimizer> {
1533        &mut self.optimizer
1534    }
1535
1536    /// Returns the current physical_optimizers value
1537    pub fn physical_optimizers(&mut self) -> &mut Option<PhysicalOptimizer> {
1538        &mut self.physical_optimizers
1539    }
1540
1541    /// Returns the current query_planner value
1542    pub fn query_planner(&mut self) -> &mut Option<Arc<dyn QueryPlanner + Send + Sync>> {
1543        &mut self.query_planner
1544    }
1545
1546    /// Returns the current catalog_list value
1547    pub fn catalog_list(&mut self) -> &mut Option<Arc<dyn CatalogProviderList>> {
1548        &mut self.catalog_list
1549    }
1550
1551    /// Returns the current table_functions value
1552    pub fn table_functions(
1553        &mut self,
1554    ) -> &mut Option<HashMap<String, Arc<TableFunction>>> {
1555        &mut self.table_functions
1556    }
1557
1558    /// Returns the current scalar_functions value
1559    pub fn scalar_functions(&mut self) -> &mut Option<Vec<Arc<ScalarUDF>>> {
1560        &mut self.scalar_functions
1561    }
1562
1563    /// Returns the current aggregate_functions value
1564    pub fn aggregate_functions(&mut self) -> &mut Option<Vec<Arc<AggregateUDF>>> {
1565        &mut self.aggregate_functions
1566    }
1567
1568    /// Returns the current window_functions value
1569    pub fn window_functions(&mut self) -> &mut Option<Vec<Arc<WindowUDF>>> {
1570        &mut self.window_functions
1571    }
1572
1573    /// Returns the current serializer_registry value
1574    pub fn serializer_registry(&mut self) -> &mut Option<Arc<dyn SerializerRegistry>> {
1575        &mut self.serializer_registry
1576    }
1577
1578    /// Returns the current file_formats value
1579    pub fn file_formats(&mut self) -> &mut Option<Vec<Arc<dyn FileFormatFactory>>> {
1580        &mut self.file_formats
1581    }
1582
1583    /// Returns the current session_config value
1584    pub fn config(&mut self) -> &mut Option<SessionConfig> {
1585        &mut self.config
1586    }
1587
1588    /// Returns the current table_options value
1589    pub fn table_options(&mut self) -> &mut Option<TableOptions> {
1590        &mut self.table_options
1591    }
1592
1593    /// Returns the current execution_props value
1594    pub fn execution_props(&mut self) -> &mut Option<ExecutionProps> {
1595        &mut self.execution_props
1596    }
1597
1598    /// Returns the current table_factories value
1599    pub fn table_factories(
1600        &mut self,
1601    ) -> &mut Option<HashMap<String, Arc<dyn TableProviderFactory>>> {
1602        &mut self.table_factories
1603    }
1604
1605    /// Returns the current runtime_env value
1606    pub fn runtime_env(&mut self) -> &mut Option<Arc<RuntimeEnv>> {
1607        &mut self.runtime_env
1608    }
1609
1610    /// Returns the current function_factory value
1611    pub fn function_factory(&mut self) -> &mut Option<Arc<dyn FunctionFactory>> {
1612        &mut self.function_factory
1613    }
1614
1615    /// Returns the current analyzer_rules value
1616    pub fn analyzer_rules(
1617        &mut self,
1618    ) -> &mut Option<Vec<Arc<dyn AnalyzerRule + Send + Sync>>> {
1619        &mut self.analyzer_rules
1620    }
1621
1622    /// Returns the current optimizer_rules value
1623    pub fn optimizer_rules(
1624        &mut self,
1625    ) -> &mut Option<Vec<Arc<dyn OptimizerRule + Send + Sync>>> {
1626        &mut self.optimizer_rules
1627    }
1628
1629    /// Returns the current physical_optimizer_rules value
1630    pub fn physical_optimizer_rules(
1631        &mut self,
1632    ) -> &mut Option<Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>> {
1633        &mut self.physical_optimizer_rules
1634    }
1635}
1636
1637impl Debug for SessionStateBuilder {
1638    /// Prefer having short fields at the top and long vector fields near the end
1639    /// Group fields by
1640    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1641        let mut debug_struct = f.debug_struct("SessionStateBuilder");
1642        let ret = debug_struct
1643            .field("session_id", &self.session_id)
1644            .field("config", &self.config)
1645            .field("runtime_env", &self.runtime_env)
1646            .field("catalog_list", &self.catalog_list)
1647            .field("serializer_registry", &self.serializer_registry)
1648            .field("file_formats", &self.file_formats)
1649            .field("execution_props", &self.execution_props)
1650            .field("table_options", &self.table_options)
1651            .field("table_factories", &self.table_factories)
1652            .field("function_factory", &self.function_factory)
1653            .field("expr_planners", &self.expr_planners);
1654        #[cfg(feature = "sql")]
1655        let ret = ret.field("type_planner", &self.type_planner);
1656        ret.field("query_planners", &self.query_planner)
1657            .field("analyzer_rules", &self.analyzer_rules)
1658            .field("analyzer", &self.analyzer)
1659            .field("optimizer_rules", &self.optimizer_rules)
1660            .field("optimizer", &self.optimizer)
1661            .field("physical_optimizer_rules", &self.physical_optimizer_rules)
1662            .field("physical_optimizers", &self.physical_optimizers)
1663            .field("table_functions", &self.table_functions)
1664            .field("scalar_functions", &self.scalar_functions)
1665            .field("aggregate_functions", &self.aggregate_functions)
1666            .field("window_functions", &self.window_functions)
1667            .finish()
1668    }
1669}
1670
1671impl Default for SessionStateBuilder {
1672    fn default() -> Self {
1673        Self::new()
1674    }
1675}
1676
1677impl From<SessionState> for SessionStateBuilder {
1678    fn from(state: SessionState) -> Self {
1679        SessionStateBuilder::new_from_existing(state)
1680    }
1681}
1682
1683/// Adapter that implements the [`ContextProvider`] trait for a [`SessionState`]
1684///
1685/// This is used so the SQL planner can access the state of the session without
1686/// having a direct dependency on the [`SessionState`] struct (and core crate)
1687#[cfg(feature = "sql")]
1688struct SessionContextProvider<'a> {
1689    state: &'a SessionState,
1690    tables: HashMap<ResolvedTableReference, Arc<dyn TableSource>>,
1691}
1692
1693#[cfg(feature = "sql")]
1694impl ContextProvider for SessionContextProvider<'_> {
1695    fn get_expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
1696        self.state.expr_planners()
1697    }
1698
1699    fn get_type_planner(&self) -> Option<Arc<dyn TypePlanner>> {
1700        if let Some(type_planner) = &self.state.type_planner {
1701            Some(Arc::clone(type_planner))
1702        } else {
1703            None
1704        }
1705    }
1706
1707    fn get_table_source(
1708        &self,
1709        name: TableReference,
1710    ) -> datafusion_common::Result<Arc<dyn TableSource>> {
1711        let name = self.state.resolve_table_ref(name);
1712        self.tables
1713            .get(&name)
1714            .cloned()
1715            .ok_or_else(|| plan_datafusion_err!("table '{name}' not found"))
1716    }
1717
1718    fn get_table_function_source(
1719        &self,
1720        name: &str,
1721        args: Vec<Expr>,
1722    ) -> datafusion_common::Result<Arc<dyn TableSource>> {
1723        let tbl_func = self
1724            .state
1725            .table_functions
1726            .get(name)
1727            .cloned()
1728            .ok_or_else(|| plan_datafusion_err!("table function '{name}' not found"))?;
1729        let dummy_schema = DFSchema::empty();
1730        let simplifier =
1731            ExprSimplifier::new(SessionSimplifyProvider::new(self.state, &dummy_schema));
1732        let args = args
1733            .into_iter()
1734            .map(|arg| simplifier.simplify(arg))
1735            .collect::<datafusion_common::Result<Vec<_>>>()?;
1736        let provider = tbl_func.create_table_provider(&args)?;
1737
1738        Ok(provider_as_source(provider))
1739    }
1740
1741    /// Create a new CTE work table for a recursive CTE logical plan
1742    /// This table will be used in conjunction with a Worktable physical plan
1743    /// to read and write each iteration of a recursive CTE
1744    fn create_cte_work_table(
1745        &self,
1746        name: &str,
1747        schema: arrow::datatypes::SchemaRef,
1748    ) -> datafusion_common::Result<Arc<dyn TableSource>> {
1749        let table = Arc::new(crate::datasource::cte_worktable::CteWorkTable::new(
1750            name, schema,
1751        ));
1752        Ok(provider_as_source(table))
1753    }
1754
1755    fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
1756        self.state.scalar_functions().get(name).cloned()
1757    }
1758
1759    fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
1760        self.state.aggregate_functions().get(name).cloned()
1761    }
1762
1763    fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
1764        self.state.window_functions().get(name).cloned()
1765    }
1766
1767    fn get_variable_type(&self, variable_names: &[String]) -> Option<DataType> {
1768        use datafusion_expr::var_provider::{is_system_variables, VarType};
1769
1770        if variable_names.is_empty() {
1771            return None;
1772        }
1773
1774        let provider_type = if is_system_variables(variable_names) {
1775            VarType::System
1776        } else {
1777            VarType::UserDefined
1778        };
1779
1780        self.state
1781            .execution_props
1782            .var_providers
1783            .as_ref()
1784            .and_then(|provider| provider.get(&provider_type)?.get_type(variable_names))
1785    }
1786
1787    fn options(&self) -> &ConfigOptions {
1788        self.state.config_options()
1789    }
1790
1791    fn udf_names(&self) -> Vec<String> {
1792        self.state.scalar_functions().keys().cloned().collect()
1793    }
1794
1795    fn udaf_names(&self) -> Vec<String> {
1796        self.state.aggregate_functions().keys().cloned().collect()
1797    }
1798
1799    fn udwf_names(&self) -> Vec<String> {
1800        self.state.window_functions().keys().cloned().collect()
1801    }
1802
1803    fn get_file_type(
1804        &self,
1805        ext: &str,
1806    ) -> datafusion_common::Result<
1807        Arc<dyn datafusion_common::file_options::file_type::FileType>,
1808    > {
1809        self.state
1810            .file_formats
1811            .get(&ext.to_lowercase())
1812            .ok_or(plan_datafusion_err!(
1813                "There is no registered file format with ext {ext}"
1814            ))
1815            .map(|file_type| {
1816                crate::datasource::file_format::format_as_file_type(Arc::clone(file_type))
1817            })
1818    }
1819}
1820
1821impl FunctionRegistry for SessionState {
1822    fn udfs(&self) -> HashSet<String> {
1823        self.scalar_functions.keys().cloned().collect()
1824    }
1825
1826    fn udf(&self, name: &str) -> datafusion_common::Result<Arc<ScalarUDF>> {
1827        let result = self.scalar_functions.get(name);
1828
1829        result.cloned().ok_or_else(|| {
1830            plan_datafusion_err!("There is no UDF named \"{name}\" in the registry. Use session context `register_udf` function to register a custom UDF")
1831        })
1832    }
1833
1834    fn udaf(&self, name: &str) -> datafusion_common::Result<Arc<AggregateUDF>> {
1835        let result = self.aggregate_functions.get(name);
1836
1837        result.cloned().ok_or_else(|| {
1838            plan_datafusion_err!("There is no UDAF named \"{name}\" in the registry. Use session context `register_udaf` function to register a custom UDAF")
1839        })
1840    }
1841
1842    fn udwf(&self, name: &str) -> datafusion_common::Result<Arc<WindowUDF>> {
1843        let result = self.window_functions.get(name);
1844
1845        result.cloned().ok_or_else(|| {
1846            plan_datafusion_err!("There is no UDWF named \"{name}\" in the registry. Use session context `register_udwf` function to register a custom UDWF")
1847        })
1848    }
1849
1850    fn register_udf(
1851        &mut self,
1852        udf: Arc<ScalarUDF>,
1853    ) -> datafusion_common::Result<Option<Arc<ScalarUDF>>> {
1854        udf.aliases().iter().for_each(|alias| {
1855            self.scalar_functions
1856                .insert(alias.clone(), Arc::clone(&udf));
1857        });
1858        Ok(self.scalar_functions.insert(udf.name().into(), udf))
1859    }
1860
1861    fn register_udaf(
1862        &mut self,
1863        udaf: Arc<AggregateUDF>,
1864    ) -> datafusion_common::Result<Option<Arc<AggregateUDF>>> {
1865        udaf.aliases().iter().for_each(|alias| {
1866            self.aggregate_functions
1867                .insert(alias.clone(), Arc::clone(&udaf));
1868        });
1869        Ok(self.aggregate_functions.insert(udaf.name().into(), udaf))
1870    }
1871
1872    fn register_udwf(
1873        &mut self,
1874        udwf: Arc<WindowUDF>,
1875    ) -> datafusion_common::Result<Option<Arc<WindowUDF>>> {
1876        udwf.aliases().iter().for_each(|alias| {
1877            self.window_functions
1878                .insert(alias.clone(), Arc::clone(&udwf));
1879        });
1880        Ok(self.window_functions.insert(udwf.name().into(), udwf))
1881    }
1882
1883    fn deregister_udf(
1884        &mut self,
1885        name: &str,
1886    ) -> datafusion_common::Result<Option<Arc<ScalarUDF>>> {
1887        let udf = self.scalar_functions.remove(name);
1888        if let Some(udf) = &udf {
1889            for alias in udf.aliases() {
1890                self.scalar_functions.remove(alias);
1891            }
1892        }
1893        Ok(udf)
1894    }
1895
1896    fn deregister_udaf(
1897        &mut self,
1898        name: &str,
1899    ) -> datafusion_common::Result<Option<Arc<AggregateUDF>>> {
1900        let udaf = self.aggregate_functions.remove(name);
1901        if let Some(udaf) = &udaf {
1902            for alias in udaf.aliases() {
1903                self.aggregate_functions.remove(alias);
1904            }
1905        }
1906        Ok(udaf)
1907    }
1908
1909    fn deregister_udwf(
1910        &mut self,
1911        name: &str,
1912    ) -> datafusion_common::Result<Option<Arc<WindowUDF>>> {
1913        let udwf = self.window_functions.remove(name);
1914        if let Some(udwf) = &udwf {
1915            for alias in udwf.aliases() {
1916                self.window_functions.remove(alias);
1917            }
1918        }
1919        Ok(udwf)
1920    }
1921
1922    fn register_function_rewrite(
1923        &mut self,
1924        rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
1925    ) -> datafusion_common::Result<()> {
1926        self.analyzer.add_function_rewrite(rewrite);
1927        Ok(())
1928    }
1929
1930    fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
1931        self.expr_planners.clone()
1932    }
1933
1934    fn register_expr_planner(
1935        &mut self,
1936        expr_planner: Arc<dyn ExprPlanner>,
1937    ) -> datafusion_common::Result<()> {
1938        self.expr_planners.push(expr_planner);
1939        Ok(())
1940    }
1941
1942    fn udafs(&self) -> HashSet<String> {
1943        self.aggregate_functions.keys().cloned().collect()
1944    }
1945
1946    fn udwfs(&self) -> HashSet<String> {
1947        self.window_functions.keys().cloned().collect()
1948    }
1949}
1950
1951impl OptimizerConfig for SessionState {
1952    fn query_execution_start_time(&self) -> DateTime<Utc> {
1953        self.execution_props.query_execution_start_time
1954    }
1955
1956    fn alias_generator(&self) -> &Arc<AliasGenerator> {
1957        &self.execution_props.alias_generator
1958    }
1959
1960    fn options(&self) -> Arc<ConfigOptions> {
1961        Arc::clone(self.config.options())
1962    }
1963
1964    fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
1965        Some(self)
1966    }
1967}
1968
1969/// Create a new task context instance from SessionState
1970impl From<&SessionState> for TaskContext {
1971    fn from(state: &SessionState) -> Self {
1972        let task_id = None;
1973        TaskContext::new(
1974            task_id,
1975            state.session_id.clone(),
1976            state.config.clone(),
1977            state.scalar_functions.clone(),
1978            state.aggregate_functions.clone(),
1979            state.window_functions.clone(),
1980            Arc::clone(&state.runtime_env),
1981        )
1982    }
1983}
1984
1985/// The query planner used if no user defined planner is provided
1986#[derive(Debug)]
1987struct DefaultQueryPlanner {}
1988
1989#[async_trait]
1990impl QueryPlanner for DefaultQueryPlanner {
1991    /// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for execution
1992    async fn create_physical_plan(
1993        &self,
1994        logical_plan: &LogicalPlan,
1995        session_state: &SessionState,
1996    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
1997        let planner = DefaultPhysicalPlanner::default();
1998        planner
1999            .create_physical_plan(logical_plan, session_state)
2000            .await
2001    }
2002}
2003
2004struct SessionSimplifyProvider<'a> {
2005    state: &'a SessionState,
2006    df_schema: &'a DFSchema,
2007}
2008
2009impl<'a> SessionSimplifyProvider<'a> {
2010    fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self {
2011        Self { state, df_schema }
2012    }
2013}
2014
2015impl SimplifyInfo for SessionSimplifyProvider<'_> {
2016    fn is_boolean_type(&self, expr: &Expr) -> datafusion_common::Result<bool> {
2017        Ok(expr.get_type(self.df_schema)? == DataType::Boolean)
2018    }
2019
2020    fn nullable(&self, expr: &Expr) -> datafusion_common::Result<bool> {
2021        expr.nullable(self.df_schema)
2022    }
2023
2024    fn execution_props(&self) -> &ExecutionProps {
2025        self.state.execution_props()
2026    }
2027
2028    fn get_data_type(&self, expr: &Expr) -> datafusion_common::Result<DataType> {
2029        expr.get_type(self.df_schema)
2030    }
2031}
2032
2033#[derive(Debug)]
2034pub(crate) struct PreparedPlan {
2035    /// Data types of the parameters
2036    pub(crate) fields: Vec<FieldRef>,
2037    /// The prepared logical plan
2038    pub(crate) plan: Arc<LogicalPlan>,
2039}
2040
2041#[cfg(test)]
2042mod tests {
2043    use super::{SessionContextProvider, SessionStateBuilder};
2044    use crate::common::assert_contains;
2045    use crate::config::ConfigOptions;
2046    use crate::datasource::empty::EmptyTable;
2047    use crate::datasource::provider_as_source;
2048    use crate::datasource::MemTable;
2049    use crate::execution::context::SessionState;
2050    use crate::logical_expr::planner::ExprPlanner;
2051    use crate::logical_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
2052    use crate::physical_plan::ExecutionPlan;
2053    use crate::sql::planner::ContextProvider;
2054    use crate::sql::{ResolvedTableReference, TableReference};
2055    use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
2056    use arrow::datatypes::{DataType, Field, Schema};
2057    use datafusion_catalog::MemoryCatalogProviderList;
2058    use datafusion_common::config::Dialect;
2059    use datafusion_common::DFSchema;
2060    use datafusion_common::Result;
2061    use datafusion_execution::config::SessionConfig;
2062    use datafusion_expr::Expr;
2063    use datafusion_optimizer::optimizer::OptimizerRule;
2064    use datafusion_optimizer::Optimizer;
2065    use datafusion_physical_plan::display::DisplayableExecutionPlan;
2066    use datafusion_sql::planner::{PlannerContext, SqlToRel};
2067    use std::collections::HashMap;
2068    use std::sync::Arc;
2069
2070    #[test]
2071    #[cfg(feature = "sql")]
2072    fn test_session_state_with_default_features() {
2073        // test array planners with and without builtin planners
2074        #[cfg(feature = "sql")]
2075        fn sql_to_expr(state: &SessionState) -> Result<Expr> {
2076            let provider = SessionContextProvider {
2077                state,
2078                tables: HashMap::new(),
2079            };
2080
2081            let sql = "[1,2,3]";
2082            let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
2083            let df_schema = DFSchema::try_from(schema)?;
2084            let dialect = state.config.options().sql_parser.dialect;
2085            let sql_expr = state.sql_to_expr(sql, &dialect)?;
2086
2087            let query = SqlToRel::new_with_options(&provider, state.get_parser_options());
2088            query.sql_to_expr(sql_expr, &df_schema, &mut PlannerContext::new())
2089        }
2090
2091        let state = SessionStateBuilder::new().with_default_features().build();
2092
2093        assert!(sql_to_expr(&state).is_ok());
2094
2095        // if no builtin planners exist, you should register your own, otherwise returns error
2096        let state = SessionStateBuilder::new().build();
2097
2098        assert!(sql_to_expr(&state).is_err())
2099    }
2100
2101    #[test]
2102    fn test_from_existing() -> Result<()> {
2103        fn employee_batch() -> RecordBatch {
2104            let name: ArrayRef =
2105                Arc::new(StringArray::from_iter_values(["Andy", "Andrew"]));
2106            let age: ArrayRef = Arc::new(Int32Array::from(vec![11, 22]));
2107            RecordBatch::try_from_iter(vec![("name", name), ("age", age)]).unwrap()
2108        }
2109        let batch = employee_batch();
2110        let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
2111
2112        let session_state = SessionStateBuilder::new()
2113            .with_catalog_list(Arc::new(MemoryCatalogProviderList::new()))
2114            .build();
2115        let table_ref = session_state.resolve_table_ref("employee").to_string();
2116        session_state
2117            .schema_for_ref(&table_ref)?
2118            .register_table("employee".to_string(), Arc::new(table))?;
2119
2120        let default_catalog = session_state
2121            .config
2122            .options()
2123            .catalog
2124            .default_catalog
2125            .clone();
2126        let default_schema = session_state
2127            .config
2128            .options()
2129            .catalog
2130            .default_schema
2131            .clone();
2132        let is_exist = session_state
2133            .catalog_list()
2134            .catalog(default_catalog.as_str())
2135            .unwrap()
2136            .schema(default_schema.as_str())
2137            .unwrap()
2138            .table_exist("employee");
2139        assert!(is_exist);
2140        let new_state = SessionStateBuilder::new_from_existing(session_state).build();
2141        assert!(new_state
2142            .catalog_list()
2143            .catalog(default_catalog.as_str())
2144            .unwrap()
2145            .schema(default_schema.as_str())
2146            .unwrap()
2147            .table_exist("employee"));
2148
2149        // if `with_create_default_catalog_and_schema` is disabled, the new one shouldn't create default catalog and schema
2150        let disable_create_default =
2151            SessionConfig::default().with_create_default_catalog_and_schema(false);
2152        let without_default_state = SessionStateBuilder::new()
2153            .with_config(disable_create_default)
2154            .build();
2155        assert!(without_default_state
2156            .catalog_list()
2157            .catalog(&default_catalog)
2158            .is_none());
2159        let new_state =
2160            SessionStateBuilder::new_from_existing(without_default_state).build();
2161        assert!(new_state.catalog_list().catalog(&default_catalog).is_none());
2162        Ok(())
2163    }
2164
2165    #[test]
2166    fn test_session_state_with_optimizer_rules() {
2167        #[derive(Default, Debug)]
2168        struct DummyRule {}
2169
2170        impl OptimizerRule for DummyRule {
2171            fn name(&self) -> &str {
2172                "dummy_rule"
2173            }
2174        }
2175        // test building sessions with fresh set of rules
2176        let state = SessionStateBuilder::new()
2177            .with_optimizer_rules(vec![Arc::new(DummyRule {})])
2178            .build();
2179
2180        assert_eq!(state.optimizers().len(), 1);
2181
2182        // test adding rules to default recommendations
2183        let state = SessionStateBuilder::new()
2184            .with_optimizer_rule(Arc::new(DummyRule {}))
2185            .build();
2186
2187        assert_eq!(
2188            state.optimizers().len(),
2189            Optimizer::default().rules.len() + 1
2190        );
2191    }
2192
2193    #[test]
2194    fn test_with_table_factories() -> Result<()> {
2195        use crate::test_util::TestTableFactory;
2196
2197        let state = SessionStateBuilder::new().build();
2198        let table_factories = state.table_factories();
2199        assert!(table_factories.is_empty());
2200
2201        let table_factory = Arc::new(TestTableFactory {});
2202        let state = SessionStateBuilder::new()
2203            .with_table_factory("employee".to_string(), table_factory)
2204            .build();
2205        let table_factories = state.table_factories();
2206        assert_eq!(table_factories.len(), 1);
2207        assert!(table_factories.contains_key("employee"));
2208        Ok(())
2209    }
2210
2211    #[test]
2212    fn test_with_default_features_not_override() -> Result<()> {
2213        use crate::test_util::TestTableFactory;
2214
2215        // Test whether the table_factory has been overridden.
2216        let table_factory = Arc::new(TestTableFactory {});
2217        let session_state = SessionStateBuilder::new()
2218            .with_table_factory("test".to_string(), table_factory)
2219            .with_default_features()
2220            .build();
2221        assert!(session_state.table_factories().get("test").is_some());
2222
2223        Ok(())
2224    }
2225
2226    /// This test demonstrates why it's more convenient and somewhat necessary to provide
2227    /// an `expr_planners` method for `SessionState`.
2228    #[tokio::test]
2229    async fn test_with_expr_planners() -> Result<()> {
2230        // A helper method for planning count wildcard with or without expr planners.
2231        async fn plan_count_wildcard(
2232            with_expr_planners: bool,
2233        ) -> Result<Arc<dyn ExecutionPlan>> {
2234            let mut context_provider = MyContextProvider::new().with_table(
2235                "t",
2236                provider_as_source(Arc::new(EmptyTable::new(Schema::empty().into()))),
2237            );
2238            if with_expr_planners {
2239                context_provider = context_provider.with_expr_planners();
2240            }
2241
2242            let state = &context_provider.state;
2243            let statement =
2244                state.sql_to_statement("select count(*) from t", &Dialect::MySQL)?;
2245            let plan = SqlToRel::new(&context_provider).statement_to_plan(statement)?;
2246            state.create_physical_plan(&plan).await
2247        }
2248
2249        // Planning count wildcard without expr planners should fail.
2250        let got = plan_count_wildcard(false).await;
2251        assert_contains!(
2252            got.unwrap_err().to_string(),
2253            "Physical plan does not support logical expression Wildcard"
2254        );
2255
2256        // Planning count wildcard with expr planners should succeed.
2257        let got = plan_count_wildcard(true).await?;
2258        let displayable = DisplayableExecutionPlan::new(got.as_ref());
2259        assert_eq!(
2260            displayable.indent(false).to_string(),
2261            "ProjectionExec: expr=[0 as count(*)]\n  PlaceholderRowExec\n"
2262        );
2263
2264        Ok(())
2265    }
2266
2267    /// A `ContextProvider` based on `SessionState`.
2268    ///
2269    /// Almost all planning context are retrieved from the `SessionState`.
2270    struct MyContextProvider {
2271        /// The session state.
2272        state: SessionState,
2273        /// Registered tables.
2274        tables: HashMap<ResolvedTableReference, Arc<dyn TableSource>>,
2275        /// Controls whether to return expression planners when called `ContextProvider::expr_planners`.
2276        return_expr_planners: bool,
2277    }
2278
2279    impl MyContextProvider {
2280        /// Creates a new `SessionContextProvider`.
2281        pub fn new() -> Self {
2282            Self {
2283                state: SessionStateBuilder::default()
2284                    .with_default_features()
2285                    .build(),
2286                tables: HashMap::new(),
2287                return_expr_planners: false,
2288            }
2289        }
2290
2291        /// Registers a table.
2292        ///
2293        /// The catalog and schema are provided by default.
2294        pub fn with_table(mut self, table: &str, source: Arc<dyn TableSource>) -> Self {
2295            self.tables.insert(
2296                ResolvedTableReference {
2297                    catalog: "default".to_string().into(),
2298                    schema: "public".to_string().into(),
2299                    table: table.to_string().into(),
2300                },
2301                source,
2302            );
2303            self
2304        }
2305
2306        /// Sets the `return_expr_planners` flag to true.
2307        pub fn with_expr_planners(self) -> Self {
2308            Self {
2309                return_expr_planners: true,
2310                ..self
2311            }
2312        }
2313    }
2314
2315    impl ContextProvider for MyContextProvider {
2316        fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
2317            let resolved_table_ref = ResolvedTableReference {
2318                catalog: "default".to_string().into(),
2319                schema: "public".to_string().into(),
2320                table: name.table().to_string().into(),
2321            };
2322            let source = self.tables.get(&resolved_table_ref).cloned().unwrap();
2323            Ok(source)
2324        }
2325
2326        /// We use a `return_expr_planners` flag to demonstrate why it's necessary to
2327        /// return the expression planners in the `SessionState`.
2328        ///
2329        /// Note, the default implementation returns an empty slice.
2330        fn get_expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
2331            if self.return_expr_planners {
2332                self.state.expr_planners()
2333            } else {
2334                &[]
2335            }
2336        }
2337
2338        fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
2339            self.state.scalar_functions().get(name).cloned()
2340        }
2341
2342        fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
2343            self.state.aggregate_functions().get(name).cloned()
2344        }
2345
2346        fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
2347            self.state.window_functions().get(name).cloned()
2348        }
2349
2350        fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
2351            None
2352        }
2353
2354        fn options(&self) -> &ConfigOptions {
2355            self.state.config_options()
2356        }
2357
2358        fn udf_names(&self) -> Vec<String> {
2359            self.state.scalar_functions().keys().cloned().collect()
2360        }
2361
2362        fn udaf_names(&self) -> Vec<String> {
2363            self.state.aggregate_functions().keys().cloned().collect()
2364        }
2365
2366        fn udwf_names(&self) -> Vec<String> {
2367            self.state.window_functions().keys().cloned().collect()
2368        }
2369    }
2370}