datafusion/execution/
session_state_defaults.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::catalog::listing_schema::ListingSchemaProvider;
19use crate::catalog::{CatalogProvider, TableProviderFactory};
20use crate::datasource::file_format::arrow::ArrowFormatFactory;
21#[cfg(feature = "avro")]
22use crate::datasource::file_format::avro::AvroFormatFactory;
23use crate::datasource::file_format::csv::CsvFormatFactory;
24use crate::datasource::file_format::json::JsonFormatFactory;
25#[cfg(feature = "parquet")]
26use crate::datasource::file_format::parquet::ParquetFormatFactory;
27use crate::datasource::file_format::FileFormatFactory;
28use crate::datasource::provider::DefaultTableFactory;
29use crate::execution::context::SessionState;
30#[cfg(feature = "nested_expressions")]
31use crate::functions_nested;
32use crate::{functions, functions_aggregate, functions_table, functions_window};
33use datafusion_catalog::TableFunction;
34use datafusion_catalog::{MemoryCatalogProvider, MemorySchemaProvider};
35use datafusion_execution::config::SessionConfig;
36use datafusion_execution::object_store::ObjectStoreUrl;
37use datafusion_execution::runtime_env::RuntimeEnv;
38use datafusion_expr::planner::ExprPlanner;
39use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
40use std::collections::HashMap;
41use std::sync::Arc;
42use url::Url;
43
44/// Defaults that are used as part of creating a SessionState such as table providers,
45/// file formats, registering of builtin functions, etc.
46pub struct SessionStateDefaults {}
47
48impl SessionStateDefaults {
49    /// returns a map of the default [`TableProviderFactory`]s
50    pub fn default_table_factories() -> HashMap<String, Arc<dyn TableProviderFactory>> {
51        let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
52            HashMap::new();
53        #[cfg(feature = "parquet")]
54        table_factories.insert("PARQUET".into(), Arc::new(DefaultTableFactory::new()));
55        table_factories.insert("CSV".into(), Arc::new(DefaultTableFactory::new()));
56        table_factories.insert("JSON".into(), Arc::new(DefaultTableFactory::new()));
57        table_factories.insert("NDJSON".into(), Arc::new(DefaultTableFactory::new()));
58        table_factories.insert("AVRO".into(), Arc::new(DefaultTableFactory::new()));
59        table_factories.insert("ARROW".into(), Arc::new(DefaultTableFactory::new()));
60
61        table_factories
62    }
63
64    /// returns the default MemoryCatalogProvider
65    pub fn default_catalog(
66        config: &SessionConfig,
67        table_factories: &HashMap<String, Arc<dyn TableProviderFactory>>,
68        runtime: &Arc<RuntimeEnv>,
69    ) -> MemoryCatalogProvider {
70        let default_catalog = MemoryCatalogProvider::new();
71
72        default_catalog
73            .register_schema(
74                &config.options().catalog.default_schema,
75                Arc::new(MemorySchemaProvider::new()),
76            )
77            .expect("memory catalog provider can register schema");
78
79        Self::register_default_schema(config, table_factories, runtime, &default_catalog);
80
81        default_catalog
82    }
83
84    /// returns the list of default [`ExprPlanner`]s
85    pub fn default_expr_planners() -> Vec<Arc<dyn ExprPlanner>> {
86        let expr_planners: Vec<Arc<dyn ExprPlanner>> = vec![
87            Arc::new(functions::core::planner::CoreFunctionPlanner::default()),
88            // register crate of nested expressions (if enabled)
89            #[cfg(feature = "nested_expressions")]
90            Arc::new(functions_nested::planner::NestedFunctionPlanner),
91            #[cfg(feature = "nested_expressions")]
92            Arc::new(functions_nested::planner::FieldAccessPlanner),
93            #[cfg(feature = "datetime_expressions")]
94            Arc::new(functions::datetime::planner::DatetimeFunctionPlanner),
95            #[cfg(feature = "unicode_expressions")]
96            Arc::new(functions::unicode::planner::UnicodeFunctionPlanner),
97            Arc::new(functions_aggregate::planner::AggregateFunctionPlanner),
98            Arc::new(functions_window::planner::WindowFunctionPlanner),
99        ];
100
101        expr_planners
102    }
103
104    /// returns the list of default [`ScalarUDF`]s
105    pub fn default_scalar_functions() -> Vec<Arc<ScalarUDF>> {
106        #[cfg_attr(not(feature = "nested_expressions"), allow(unused_mut))]
107        let mut functions: Vec<Arc<ScalarUDF>> = functions::all_default_functions();
108
109        #[cfg(feature = "nested_expressions")]
110        functions.append(&mut functions_nested::all_default_nested_functions());
111
112        functions
113    }
114
115    /// returns the list of default [`AggregateUDF`]s
116    pub fn default_aggregate_functions() -> Vec<Arc<AggregateUDF>> {
117        functions_aggregate::all_default_aggregate_functions()
118    }
119
120    /// returns the list of default [`WindowUDF`]s
121    pub fn default_window_functions() -> Vec<Arc<WindowUDF>> {
122        functions_window::all_default_window_functions()
123    }
124
125    /// returns the list of default [`TableFunction`]s
126    pub fn default_table_functions() -> Vec<Arc<TableFunction>> {
127        functions_table::all_default_table_functions()
128    }
129
130    /// returns the list of default [`FileFormatFactory`]s
131    pub fn default_file_formats() -> Vec<Arc<dyn FileFormatFactory>> {
132        let file_formats: Vec<Arc<dyn FileFormatFactory>> = vec![
133            #[cfg(feature = "parquet")]
134            Arc::new(ParquetFormatFactory::new()),
135            Arc::new(JsonFormatFactory::new()),
136            Arc::new(CsvFormatFactory::new()),
137            Arc::new(ArrowFormatFactory::new()),
138            #[cfg(feature = "avro")]
139            Arc::new(AvroFormatFactory::new()),
140        ];
141
142        file_formats
143    }
144
145    /// registers all builtin functions - scalar, array and aggregate
146    pub fn register_builtin_functions(state: &mut SessionState) {
147        Self::register_scalar_functions(state);
148        Self::register_array_functions(state);
149        Self::register_aggregate_functions(state);
150    }
151
152    /// registers all the builtin scalar functions
153    pub fn register_scalar_functions(state: &mut SessionState) {
154        functions::register_all(state).expect("can not register built in functions");
155    }
156
157    /// registers all the builtin array functions
158    #[cfg_attr(not(feature = "nested_expressions"), allow(unused_variables))]
159    pub fn register_array_functions(state: &mut SessionState) {
160        // register crate of array expressions (if enabled)
161        #[cfg(feature = "nested_expressions")]
162        functions_nested::register_all(state)
163            .expect("can not register nested expressions");
164    }
165
166    /// registers all the builtin aggregate functions
167    pub fn register_aggregate_functions(state: &mut SessionState) {
168        functions_aggregate::register_all(state)
169            .expect("can not register aggregate functions");
170    }
171
172    /// registers the default schema
173    pub fn register_default_schema(
174        config: &SessionConfig,
175        table_factories: &HashMap<String, Arc<dyn TableProviderFactory>>,
176        runtime: &Arc<RuntimeEnv>,
177        default_catalog: &MemoryCatalogProvider,
178    ) {
179        let url = config.options().catalog.location.as_ref();
180        let format = config.options().catalog.format.as_ref();
181        let (url, format) = match (url, format) {
182            (Some(url), Some(format)) => (url, format),
183            _ => return,
184        };
185        let url = url.to_string();
186        let format = format.to_string();
187
188        let url = Url::parse(url.as_str()).expect("Invalid default catalog location!");
189        let authority = match url.host_str() {
190            Some(host) => format!("{}://{}", url.scheme(), host),
191            None => format!("{}://", url.scheme()),
192        };
193        let path = &url.as_str()[authority.len()..];
194        let path = object_store::path::Path::parse(path).expect("Can't parse path");
195        let store = ObjectStoreUrl::parse(authority.as_str())
196            .expect("Invalid default catalog url");
197        let store = match runtime.object_store(store) {
198            Ok(store) => store,
199            _ => return,
200        };
201        let factory = match table_factories.get(format.as_str()) {
202            Some(factory) => factory,
203            _ => return,
204        };
205        let schema = ListingSchemaProvider::new(
206            authority,
207            path,
208            Arc::clone(factory),
209            store,
210            format,
211        );
212        let _ = default_catalog
213            .register_schema("default", Arc::new(schema))
214            .expect("Failed to register default schema");
215    }
216
217    /// registers the default [`FileFormatFactory`]s
218    pub fn register_default_file_formats(state: &mut SessionState) {
219        let formats = SessionStateDefaults::default_file_formats();
220        for format in formats {
221            if let Err(e) = state.register_file_format(format, false) {
222                log::info!("Unable to register default file format: {e}")
223            };
224        }
225    }
226}