datafusion_catalog/dynamic_file/
catalog.rs1use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
21use async_trait::async_trait;
22use std::any::Any;
23use std::fmt::Debug;
24use std::sync::Arc;
25
26#[derive(Debug)]
28pub struct DynamicFileCatalog {
29 inner: Arc<dyn CatalogProviderList>,
31 factory: Arc<dyn UrlTableFactory>,
33}
34
35impl DynamicFileCatalog {
36 pub fn new(
37 inner: Arc<dyn CatalogProviderList>,
38 factory: Arc<dyn UrlTableFactory>,
39 ) -> Self {
40 Self { inner, factory }
41 }
42}
43
44impl CatalogProviderList for DynamicFileCatalog {
45 fn as_any(&self) -> &dyn Any {
46 self
47 }
48
49 fn register_catalog(
50 &self,
51 name: String,
52 catalog: Arc<dyn CatalogProvider>,
53 ) -> Option<Arc<dyn CatalogProvider>> {
54 self.inner.register_catalog(name, catalog)
55 }
56
57 fn catalog_names(&self) -> Vec<String> {
58 self.inner.catalog_names()
59 }
60
61 fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
62 self.inner.catalog(name).map(|catalog| {
63 Arc::new(DynamicFileCatalogProvider::new(
64 catalog,
65 Arc::clone(&self.factory),
66 )) as _
67 })
68 }
69}
70
71#[derive(Debug)]
73struct DynamicFileCatalogProvider {
74 inner: Arc<dyn CatalogProvider>,
76 factory: Arc<dyn UrlTableFactory>,
78}
79
80impl DynamicFileCatalogProvider {
81 pub fn new(
82 inner: Arc<dyn CatalogProvider>,
83 factory: Arc<dyn UrlTableFactory>,
84 ) -> Self {
85 Self { inner, factory }
86 }
87}
88
89impl CatalogProvider for DynamicFileCatalogProvider {
90 fn as_any(&self) -> &dyn Any {
91 self
92 }
93
94 fn schema_names(&self) -> Vec<String> {
95 self.inner.schema_names()
96 }
97
98 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
99 self.inner.schema(name).map(|schema| {
100 Arc::new(DynamicFileSchemaProvider::new(
101 schema,
102 Arc::clone(&self.factory),
103 )) as _
104 })
105 }
106
107 fn register_schema(
108 &self,
109 name: &str,
110 schema: Arc<dyn SchemaProvider>,
111 ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
112 self.inner.register_schema(name, schema)
113 }
114}
115
116#[derive(Debug)]
121pub struct DynamicFileSchemaProvider {
122 inner: Arc<dyn SchemaProvider>,
124 factory: Arc<dyn UrlTableFactory>,
126}
127
128impl DynamicFileSchemaProvider {
129 pub fn new(
131 inner: Arc<dyn SchemaProvider>,
132 factory: Arc<dyn UrlTableFactory>,
133 ) -> Self {
134 Self { inner, factory }
135 }
136}
137
138#[async_trait]
139impl SchemaProvider for DynamicFileSchemaProvider {
140 fn as_any(&self) -> &dyn Any {
141 self
142 }
143
144 fn table_names(&self) -> Vec<String> {
145 self.inner.table_names()
146 }
147
148 async fn table(
149 &self,
150 name: &str,
151 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
152 if let Some(table) = self.inner.table(name).await? {
153 return Ok(Some(table));
154 };
155
156 self.factory.try_new(name).await
157 }
158
159 fn register_table(
160 &self,
161 name: String,
162 table: Arc<dyn TableProvider>,
163 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
164 self.inner.register_table(name, table)
165 }
166
167 fn deregister_table(
168 &self,
169 name: &str,
170 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
171 self.inner.deregister_table(name)
172 }
173
174 fn table_exist(&self, name: &str) -> bool {
175 self.inner.table_exist(name)
176 }
177}
178
179#[async_trait]
181pub trait UrlTableFactory: Debug + Sync + Send {
182 async fn try_new(
184 &self,
185 url: &str,
186 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>>;
187}