datafusion_catalog/memory/
schema.rs1use crate::{SchemaProvider, TableProvider};
21use async_trait::async_trait;
22use dashmap::DashMap;
23use datafusion_common::{exec_err, DataFusionError};
24use std::any::Any;
25use std::sync::Arc;
26
27#[derive(Debug)]
29pub struct MemorySchemaProvider {
30 tables: DashMap<String, Arc<dyn TableProvider>>,
31}
32
33impl MemorySchemaProvider {
34 pub fn new() -> Self {
36 Self {
37 tables: DashMap::new(),
38 }
39 }
40}
41
42impl Default for MemorySchemaProvider {
43 fn default() -> Self {
44 Self::new()
45 }
46}
47
48#[async_trait]
49impl SchemaProvider for MemorySchemaProvider {
50 fn as_any(&self) -> &dyn Any {
51 self
52 }
53
54 fn table_names(&self) -> Vec<String> {
55 self.tables
56 .iter()
57 .map(|table| table.key().clone())
58 .collect()
59 }
60
61 async fn table(
62 &self,
63 name: &str,
64 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
65 Ok(self.tables.get(name).map(|table| Arc::clone(table.value())))
66 }
67
68 fn register_table(
69 &self,
70 name: String,
71 table: Arc<dyn TableProvider>,
72 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
73 if self.table_exist(name.as_str()) {
74 return exec_err!("The table {name} already exists");
75 }
76 Ok(self.tables.insert(name, table))
77 }
78
79 fn deregister_table(
80 &self,
81 name: &str,
82 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
83 Ok(self.tables.remove(name).map(|(_, table)| table))
84 }
85
86 fn table_exist(&self, name: &str) -> bool {
87 self.tables.contains_key(name)
88 }
89}