datafusion_catalog/dynamic_file/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DynamicFileCatalog`] that creates tables from file paths
19
20use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
21use async_trait::async_trait;
22use std::any::Any;
23use std::fmt::Debug;
24use std::sync::Arc;
25
26/// Wrap another catalog provider list
27#[derive(Debug)]
28pub struct DynamicFileCatalog {
29    /// The inner catalog provider list
30    inner: Arc<dyn CatalogProviderList>,
31    /// The factory that can create a table provider from the file path
32    factory: Arc<dyn UrlTableFactory>,
33}
34
35impl DynamicFileCatalog {
36    pub fn new(
37        inner: Arc<dyn CatalogProviderList>,
38        factory: Arc<dyn UrlTableFactory>,
39    ) -> Self {
40        Self { inner, factory }
41    }
42}
43
44impl CatalogProviderList for DynamicFileCatalog {
45    fn as_any(&self) -> &dyn Any {
46        self
47    }
48
49    fn register_catalog(
50        &self,
51        name: String,
52        catalog: Arc<dyn CatalogProvider>,
53    ) -> Option<Arc<dyn CatalogProvider>> {
54        self.inner.register_catalog(name, catalog)
55    }
56
57    fn catalog_names(&self) -> Vec<String> {
58        self.inner.catalog_names()
59    }
60
61    fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
62        self.inner.catalog(name).map(|catalog| {
63            Arc::new(DynamicFileCatalogProvider::new(
64                catalog,
65                Arc::clone(&self.factory),
66            )) as _
67        })
68    }
69}
70
71/// Wraps another catalog provider
72#[derive(Debug)]
73struct DynamicFileCatalogProvider {
74    /// The inner catalog provider
75    inner: Arc<dyn CatalogProvider>,
76    /// The factory that can create a table provider from the file path
77    factory: Arc<dyn UrlTableFactory>,
78}
79
80impl DynamicFileCatalogProvider {
81    pub fn new(
82        inner: Arc<dyn CatalogProvider>,
83        factory: Arc<dyn UrlTableFactory>,
84    ) -> Self {
85        Self { inner, factory }
86    }
87}
88
89impl CatalogProvider for DynamicFileCatalogProvider {
90    fn as_any(&self) -> &dyn Any {
91        self
92    }
93
94    fn schema_names(&self) -> Vec<String> {
95        self.inner.schema_names()
96    }
97
98    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
99        self.inner.schema(name).map(|schema| {
100            Arc::new(DynamicFileSchemaProvider::new(
101                schema,
102                Arc::clone(&self.factory),
103            )) as _
104        })
105    }
106
107    fn register_schema(
108        &self,
109        name: &str,
110        schema: Arc<dyn SchemaProvider>,
111    ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
112        self.inner.register_schema(name, schema)
113    }
114}
115
116/// Implements the [DynamicFileSchemaProvider] that can create tables provider from the file path.
117///
118/// The provider will try to create a table provider from the file path if the table provider
119/// isn't exist in the inner schema provider.
120#[derive(Debug)]
121pub struct DynamicFileSchemaProvider {
122    /// The inner schema provider
123    inner: Arc<dyn SchemaProvider>,
124    /// The factory that can create a table provider from the file path
125    factory: Arc<dyn UrlTableFactory>,
126}
127
128impl DynamicFileSchemaProvider {
129    /// Create a new [DynamicFileSchemaProvider] with the given inner schema provider.
130    pub fn new(
131        inner: Arc<dyn SchemaProvider>,
132        factory: Arc<dyn UrlTableFactory>,
133    ) -> Self {
134        Self { inner, factory }
135    }
136}
137
138#[async_trait]
139impl SchemaProvider for DynamicFileSchemaProvider {
140    fn as_any(&self) -> &dyn Any {
141        self
142    }
143
144    fn table_names(&self) -> Vec<String> {
145        self.inner.table_names()
146    }
147
148    async fn table(
149        &self,
150        name: &str,
151    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
152        if let Some(table) = self.inner.table(name).await? {
153            return Ok(Some(table));
154        };
155
156        self.factory.try_new(name).await
157    }
158
159    fn register_table(
160        &self,
161        name: String,
162        table: Arc<dyn TableProvider>,
163    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
164        self.inner.register_table(name, table)
165    }
166
167    fn deregister_table(
168        &self,
169        name: &str,
170    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
171        self.inner.deregister_table(name)
172    }
173
174    fn table_exist(&self, name: &str) -> bool {
175        self.inner.table_exist(name)
176    }
177}
178
179/// [UrlTableFactory] is a factory that can create a table provider from the given url.
180#[async_trait]
181pub trait UrlTableFactory: Debug + Sync + Send {
182    /// create a new table provider from the provided url
183    async fn try_new(
184        &self,
185        url: &str,
186    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>>;
187}