datafusion_catalog/memory/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`MemorySchemaProvider`]: In-memory implementations of [`SchemaProvider`].
19
20use crate::{SchemaProvider, TableProvider};
21use async_trait::async_trait;
22use dashmap::DashMap;
23use datafusion_common::{exec_err, DataFusionError};
24use std::any::Any;
25use std::sync::Arc;
26
27/// Simple in-memory implementation of a schema.
28#[derive(Debug)]
29pub struct MemorySchemaProvider {
30    tables: DashMap<String, Arc<dyn TableProvider>>,
31}
32
33impl MemorySchemaProvider {
34    /// Instantiates a new MemorySchemaProvider with an empty collection of tables.
35    pub fn new() -> Self {
36        Self {
37            tables: DashMap::new(),
38        }
39    }
40}
41
42impl Default for MemorySchemaProvider {
43    fn default() -> Self {
44        Self::new()
45    }
46}
47
48#[async_trait]
49impl SchemaProvider for MemorySchemaProvider {
50    fn as_any(&self) -> &dyn Any {
51        self
52    }
53
54    fn table_names(&self) -> Vec<String> {
55        self.tables
56            .iter()
57            .map(|table| table.key().clone())
58            .collect()
59    }
60
61    async fn table(
62        &self,
63        name: &str,
64    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
65        Ok(self.tables.get(name).map(|table| Arc::clone(table.value())))
66    }
67
68    fn register_table(
69        &self,
70        name: String,
71        table: Arc<dyn TableProvider>,
72    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
73        if self.table_exist(name.as_str()) {
74            return exec_err!("The table {name} already exists");
75        }
76        Ok(self.tables.insert(name, table))
77    }
78
79    fn deregister_table(
80        &self,
81        name: &str,
82    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
83        Ok(self.tables.remove(name).map(|(_, table)| table))
84    }
85
86    fn table_exist(&self, name: &str) -> bool {
87        self.tables.contains_key(name)
88    }
89}