datafusion/execution/
session_state_defaults.rs1use crate::catalog::listing_schema::ListingSchemaProvider;
19use crate::catalog::{CatalogProvider, TableProviderFactory};
20use crate::datasource::file_format::arrow::ArrowFormatFactory;
21#[cfg(feature = "avro")]
22use crate::datasource::file_format::avro::AvroFormatFactory;
23use crate::datasource::file_format::csv::CsvFormatFactory;
24use crate::datasource::file_format::json::JsonFormatFactory;
25#[cfg(feature = "parquet")]
26use crate::datasource::file_format::parquet::ParquetFormatFactory;
27use crate::datasource::file_format::FileFormatFactory;
28use crate::datasource::provider::DefaultTableFactory;
29use crate::execution::context::SessionState;
30#[cfg(feature = "nested_expressions")]
31use crate::functions_nested;
32use crate::{functions, functions_aggregate, functions_table, functions_window};
33use datafusion_catalog::TableFunction;
34use datafusion_catalog::{MemoryCatalogProvider, MemorySchemaProvider};
35use datafusion_execution::config::SessionConfig;
36use datafusion_execution::object_store::ObjectStoreUrl;
37use datafusion_execution::runtime_env::RuntimeEnv;
38use datafusion_expr::planner::ExprPlanner;
39use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
40use std::collections::HashMap;
41use std::sync::Arc;
42use url::Url;
43
44pub struct SessionStateDefaults {}
47
48impl SessionStateDefaults {
49 pub fn default_table_factories() -> HashMap<String, Arc<dyn TableProviderFactory>> {
51 let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
52 HashMap::new();
53 #[cfg(feature = "parquet")]
54 table_factories.insert("PARQUET".into(), Arc::new(DefaultTableFactory::new()));
55 table_factories.insert("CSV".into(), Arc::new(DefaultTableFactory::new()));
56 table_factories.insert("JSON".into(), Arc::new(DefaultTableFactory::new()));
57 table_factories.insert("NDJSON".into(), Arc::new(DefaultTableFactory::new()));
58 table_factories.insert("AVRO".into(), Arc::new(DefaultTableFactory::new()));
59 table_factories.insert("ARROW".into(), Arc::new(DefaultTableFactory::new()));
60
61 table_factories
62 }
63
64 pub fn default_catalog(
66 config: &SessionConfig,
67 table_factories: &HashMap<String, Arc<dyn TableProviderFactory>>,
68 runtime: &Arc<RuntimeEnv>,
69 ) -> MemoryCatalogProvider {
70 let default_catalog = MemoryCatalogProvider::new();
71
72 default_catalog
73 .register_schema(
74 &config.options().catalog.default_schema,
75 Arc::new(MemorySchemaProvider::new()),
76 )
77 .expect("memory catalog provider can register schema");
78
79 Self::register_default_schema(config, table_factories, runtime, &default_catalog);
80
81 default_catalog
82 }
83
84 pub fn default_expr_planners() -> Vec<Arc<dyn ExprPlanner>> {
86 let expr_planners: Vec<Arc<dyn ExprPlanner>> = vec![
87 Arc::new(functions::core::planner::CoreFunctionPlanner::default()),
88 #[cfg(feature = "nested_expressions")]
90 Arc::new(functions_nested::planner::NestedFunctionPlanner),
91 #[cfg(feature = "nested_expressions")]
92 Arc::new(functions_nested::planner::FieldAccessPlanner),
93 #[cfg(feature = "datetime_expressions")]
94 Arc::new(functions::datetime::planner::DatetimeFunctionPlanner),
95 #[cfg(feature = "unicode_expressions")]
96 Arc::new(functions::unicode::planner::UnicodeFunctionPlanner),
97 Arc::new(functions_aggregate::planner::AggregateFunctionPlanner),
98 Arc::new(functions_window::planner::WindowFunctionPlanner),
99 ];
100
101 expr_planners
102 }
103
104 pub fn default_scalar_functions() -> Vec<Arc<ScalarUDF>> {
106 #[cfg_attr(not(feature = "nested_expressions"), allow(unused_mut))]
107 let mut functions: Vec<Arc<ScalarUDF>> = functions::all_default_functions();
108
109 #[cfg(feature = "nested_expressions")]
110 functions.append(&mut functions_nested::all_default_nested_functions());
111
112 functions
113 }
114
115 pub fn default_aggregate_functions() -> Vec<Arc<AggregateUDF>> {
117 functions_aggregate::all_default_aggregate_functions()
118 }
119
120 pub fn default_window_functions() -> Vec<Arc<WindowUDF>> {
122 functions_window::all_default_window_functions()
123 }
124
125 pub fn default_table_functions() -> Vec<Arc<TableFunction>> {
127 functions_table::all_default_table_functions()
128 }
129
130 pub fn default_file_formats() -> Vec<Arc<dyn FileFormatFactory>> {
132 let file_formats: Vec<Arc<dyn FileFormatFactory>> = vec![
133 #[cfg(feature = "parquet")]
134 Arc::new(ParquetFormatFactory::new()),
135 Arc::new(JsonFormatFactory::new()),
136 Arc::new(CsvFormatFactory::new()),
137 Arc::new(ArrowFormatFactory::new()),
138 #[cfg(feature = "avro")]
139 Arc::new(AvroFormatFactory::new()),
140 ];
141
142 file_formats
143 }
144
145 pub fn register_builtin_functions(state: &mut SessionState) {
147 Self::register_scalar_functions(state);
148 Self::register_array_functions(state);
149 Self::register_aggregate_functions(state);
150 }
151
152 pub fn register_scalar_functions(state: &mut SessionState) {
154 functions::register_all(state).expect("can not register built in functions");
155 }
156
157 #[cfg_attr(not(feature = "nested_expressions"), allow(unused_variables))]
159 pub fn register_array_functions(state: &mut SessionState) {
160 #[cfg(feature = "nested_expressions")]
162 functions_nested::register_all(state)
163 .expect("can not register nested expressions");
164 }
165
166 pub fn register_aggregate_functions(state: &mut SessionState) {
168 functions_aggregate::register_all(state)
169 .expect("can not register aggregate functions");
170 }
171
172 pub fn register_default_schema(
174 config: &SessionConfig,
175 table_factories: &HashMap<String, Arc<dyn TableProviderFactory>>,
176 runtime: &Arc<RuntimeEnv>,
177 default_catalog: &MemoryCatalogProvider,
178 ) {
179 let url = config.options().catalog.location.as_ref();
180 let format = config.options().catalog.format.as_ref();
181 let (url, format) = match (url, format) {
182 (Some(url), Some(format)) => (url, format),
183 _ => return,
184 };
185 let url = url.to_string();
186 let format = format.to_string();
187
188 let url = Url::parse(url.as_str()).expect("Invalid default catalog location!");
189 let authority = match url.host_str() {
190 Some(host) => format!("{}://{}", url.scheme(), host),
191 None => format!("{}://", url.scheme()),
192 };
193 let path = &url.as_str()[authority.len()..];
194 let path = object_store::path::Path::parse(path).expect("Can't parse path");
195 let store = ObjectStoreUrl::parse(authority.as_str())
196 .expect("Invalid default catalog url");
197 let store = match runtime.object_store(store) {
198 Ok(store) => store,
199 _ => return,
200 };
201 let factory = match table_factories.get(format.as_str()) {
202 Some(factory) => factory,
203 _ => return,
204 };
205 let schema = ListingSchemaProvider::new(
206 authority,
207 path,
208 Arc::clone(factory),
209 store,
210 format,
211 );
212 let _ = default_catalog
213 .register_schema("default", Arc::new(schema))
214 .expect("Failed to register default schema");
215 }
216
217 pub fn register_default_file_formats(state: &mut SessionState) {
219 let formats = SessionStateDefaults::default_file_formats();
220 for format in formats {
221 if let Err(e) = state.register_file_format(format, false) {
222 log::info!("Unable to register default file format: {e}")
223 };
224 }
225 }
226}