datafusion_catalog/memory/
catalog.rs1use crate::{CatalogProvider, CatalogProviderList, SchemaProvider};
22use dashmap::DashMap;
23use datafusion_common::exec_err;
24use std::any::Any;
25use std::sync::Arc;
26
27#[derive(Debug)]
29pub struct MemoryCatalogProviderList {
30 pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
32}
33
34impl MemoryCatalogProviderList {
35 pub fn new() -> Self {
37 Self {
38 catalogs: DashMap::new(),
39 }
40 }
41}
42
43impl Default for MemoryCatalogProviderList {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49impl CatalogProviderList for MemoryCatalogProviderList {
50 fn as_any(&self) -> &dyn Any {
51 self
52 }
53
54 fn register_catalog(
55 &self,
56 name: String,
57 catalog: Arc<dyn CatalogProvider>,
58 ) -> Option<Arc<dyn CatalogProvider>> {
59 self.catalogs.insert(name, catalog)
60 }
61
62 fn catalog_names(&self) -> Vec<String> {
63 self.catalogs.iter().map(|c| c.key().clone()).collect()
64 }
65
66 fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
67 self.catalogs.get(name).map(|c| Arc::clone(c.value()))
68 }
69}
70
71#[derive(Debug)]
73pub struct MemoryCatalogProvider {
74 schemas: DashMap<String, Arc<dyn SchemaProvider>>,
75}
76
77impl MemoryCatalogProvider {
78 pub fn new() -> Self {
80 Self {
81 schemas: DashMap::new(),
82 }
83 }
84}
85
86impl Default for MemoryCatalogProvider {
87 fn default() -> Self {
88 Self::new()
89 }
90}
91
92impl CatalogProvider for MemoryCatalogProvider {
93 fn as_any(&self) -> &dyn Any {
94 self
95 }
96
97 fn schema_names(&self) -> Vec<String> {
98 self.schemas.iter().map(|s| s.key().clone()).collect()
99 }
100
101 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
102 self.schemas.get(name).map(|s| Arc::clone(s.value()))
103 }
104
105 fn register_schema(
106 &self,
107 name: &str,
108 schema: Arc<dyn SchemaProvider>,
109 ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
110 Ok(self.schemas.insert(name.into(), schema))
111 }
112
113 fn deregister_schema(
114 &self,
115 name: &str,
116 cascade: bool,
117 ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
118 if let Some(schema) = self.schema(name) {
119 let table_names = schema.table_names();
120 match (table_names.is_empty(), cascade) {
121 (true, _) | (false, true) => {
122 let (_, removed) = self.schemas.remove(name).unwrap();
123 Ok(Some(removed))
124 }
125 (false, false) => exec_err!(
126 "Cannot drop schema {} because other tables depend on it: {}",
127 name,
128 itertools::join(table_names.iter(), ", ")
129 ),
130 }
131 } else {
132 Ok(None)
133 }
134 }
135}