datafusion_catalog/
listing_schema.rs1use std::any::Any;
21use std::collections::HashSet;
22use std::path::Path;
23use std::sync::{Arc, Mutex};
24
25use crate::{SchemaProvider, TableProvider, TableProviderFactory};
26
27use crate::Session;
28use datafusion_common::{
29 internal_datafusion_err, DFSchema, DataFusionError, HashMap, TableReference,
30};
31use datafusion_expr::CreateExternalTable;
32
33use async_trait::async_trait;
34use futures::TryStreamExt;
35use itertools::Itertools;
36use object_store::ObjectStore;
37
38#[derive(Debug)]
54pub struct ListingSchemaProvider {
55 authority: String,
56 path: object_store::path::Path,
57 factory: Arc<dyn TableProviderFactory>,
58 store: Arc<dyn ObjectStore>,
59 tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
60 format: String,
61}
62
63impl ListingSchemaProvider {
64 pub fn new(
74 authority: String,
75 path: object_store::path::Path,
76 factory: Arc<dyn TableProviderFactory>,
77 store: Arc<dyn ObjectStore>,
78 format: String,
79 ) -> Self {
80 Self {
81 authority,
82 path,
83 factory,
84 store,
85 tables: Arc::new(Mutex::new(HashMap::new())),
86 format,
87 }
88 }
89
90 pub async fn refresh(&self, state: &dyn Session) -> datafusion_common::Result<()> {
92 let entries: Vec<_> = self.store.list(Some(&self.path)).try_collect().await?;
93 let base = Path::new(self.path.as_ref());
94 let mut tables = HashSet::new();
95 for file in entries.iter() {
96 let mut is_dir = false;
98 let mut parent = Path::new(file.location.as_ref());
99 while let Some(p) = parent.parent() {
100 if p == base {
101 tables.insert(TablePath {
102 is_dir,
103 path: parent,
104 });
105 }
106 parent = p;
107 is_dir = true;
108 }
109 }
110 for table in tables.iter() {
111 let file_name = table
112 .path
113 .file_name()
114 .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?
115 .to_str()
116 .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?;
117 let table_name = file_name.split('.').collect_vec()[0];
118 let table_path = table
119 .to_string()
120 .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?;
121
122 if !self.table_exist(table_name) {
123 let table_url = format!("{}/{}", self.authority, table_path);
124
125 let name = TableReference::bare(table_name);
126 let provider = self
127 .factory
128 .create(
129 state,
130 &CreateExternalTable {
131 schema: Arc::new(DFSchema::empty()),
132 name,
133 location: table_url,
134 file_type: self.format.clone(),
135 table_partition_cols: vec![],
136 if_not_exists: false,
137 or_replace: false,
138 temporary: false,
139 definition: None,
140 order_exprs: vec![],
141 unbounded: false,
142 options: Default::default(),
143 constraints: Default::default(),
144 column_defaults: Default::default(),
145 },
146 )
147 .await?;
148 let _ =
149 self.register_table(table_name.to_string(), Arc::clone(&provider))?;
150 }
151 }
152 Ok(())
153 }
154}
155
156#[async_trait]
157impl SchemaProvider for ListingSchemaProvider {
158 fn as_any(&self) -> &dyn Any {
159 self
160 }
161
162 fn table_names(&self) -> Vec<String> {
163 self.tables
164 .lock()
165 .expect("Can't lock tables")
166 .keys()
167 .map(|it| it.to_string())
168 .collect()
169 }
170
171 async fn table(
172 &self,
173 name: &str,
174 ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
175 Ok(self
176 .tables
177 .lock()
178 .expect("Can't lock tables")
179 .get(name)
180 .cloned())
181 }
182
183 fn register_table(
184 &self,
185 name: String,
186 table: Arc<dyn TableProvider>,
187 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
188 self.tables
189 .lock()
190 .expect("Can't lock tables")
191 .insert(name, Arc::clone(&table));
192 Ok(Some(table))
193 }
194
195 fn deregister_table(
196 &self,
197 name: &str,
198 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
199 Ok(self.tables.lock().expect("Can't lock tables").remove(name))
200 }
201
202 fn table_exist(&self, name: &str) -> bool {
203 self.tables
204 .lock()
205 .expect("Can't lock tables")
206 .contains_key(name)
207 }
208}
209
210#[derive(Eq, PartialEq, Hash, Debug)]
213struct TablePath<'a> {
214 path: &'a Path,
215 is_dir: bool,
216}
217
218impl TablePath<'_> {
219 fn to_string(&self) -> Option<String> {
222 self.path.to_str().map(|path| {
223 if self.is_dir {
224 format!("{path}/")
225 } else {
226 path.to_string()
227 }
228 })
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[test]
237 fn table_path_ends_with_slash_when_is_dir() {
238 let table_path = TablePath {
239 path: Path::new("/file"),
240 is_dir: true,
241 };
242 assert!(table_path.to_string().expect("table path").ends_with('/'));
243 }
244
245 #[test]
246 fn dir_table_path_str_does_not_end_with_slash_when_not_is_dir() {
247 let table_path = TablePath {
248 path: Path::new("/file"),
249 is_dir: false,
250 };
251 assert!(!table_path.to_string().expect("table_path").ends_with('/'));
252 }
253}