datafusion_catalog/
listing_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ListingSchemaProvider`]: [`SchemaProvider`] that scans ObjectStores for tables automatically
19
20use std::any::Any;
21use std::collections::HashSet;
22use std::path::Path;
23use std::sync::{Arc, Mutex};
24
25use crate::{SchemaProvider, TableProvider, TableProviderFactory};
26
27use crate::Session;
28use datafusion_common::{
29    internal_datafusion_err, DFSchema, DataFusionError, HashMap, TableReference,
30};
31use datafusion_expr::CreateExternalTable;
32
33use async_trait::async_trait;
34use futures::TryStreamExt;
35use itertools::Itertools;
36use object_store::ObjectStore;
37
38/// A [`SchemaProvider`] that scans an [`ObjectStore`] to automatically discover tables
39///
40/// A subfolder relationship is assumed, i.e. given:
41/// - authority = `s3://host.example.com:3000`
42/// - path = `/data/tpch`
43/// - factory = `DeltaTableFactory`
44///
45/// A table called "customer" will be registered for the folder:
46/// `s3://host.example.com:3000/data/tpch/customer`
47///
48/// assuming it contains valid deltalake data, i.e:
49/// - `s3://host.example.com:3000/data/tpch/customer/part-00000-xxxx.snappy.parquet`
50/// - `s3://host.example.com:3000/data/tpch/customer/_delta_log/`
51///
52/// [`ObjectStore`]: object_store::ObjectStore
53#[derive(Debug)]
54pub struct ListingSchemaProvider {
55    authority: String,
56    path: object_store::path::Path,
57    factory: Arc<dyn TableProviderFactory>,
58    store: Arc<dyn ObjectStore>,
59    tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
60    format: String,
61}
62
63impl ListingSchemaProvider {
64    /// Create a new `ListingSchemaProvider`
65    ///
66    /// Arguments:
67    /// `authority`: The scheme (i.e. s3://) + host (i.e. example.com:3000)
68    /// `path`: The root path that contains subfolders which represent tables
69    /// `factory`: The `TableProviderFactory` to use to instantiate tables for each subfolder
70    /// `store`: The `ObjectStore` containing the table data
71    /// `format`: The `FileFormat` of the tables
72    /// `has_header`: Indicates whether the created external table has the has_header flag enabled
73    pub fn new(
74        authority: String,
75        path: object_store::path::Path,
76        factory: Arc<dyn TableProviderFactory>,
77        store: Arc<dyn ObjectStore>,
78        format: String,
79    ) -> Self {
80        Self {
81            authority,
82            path,
83            factory,
84            store,
85            tables: Arc::new(Mutex::new(HashMap::new())),
86            format,
87        }
88    }
89
90    /// Reload table information from ObjectStore
91    pub async fn refresh(&self, state: &dyn Session) -> datafusion_common::Result<()> {
92        let entries: Vec<_> = self.store.list(Some(&self.path)).try_collect().await?;
93        let base = Path::new(self.path.as_ref());
94        let mut tables = HashSet::new();
95        for file in entries.iter() {
96            // The listing will initially be a file. However if we've recursed up to match our base, we know our path is a directory.
97            let mut is_dir = false;
98            let mut parent = Path::new(file.location.as_ref());
99            while let Some(p) = parent.parent() {
100                if p == base {
101                    tables.insert(TablePath {
102                        is_dir,
103                        path: parent,
104                    });
105                }
106                parent = p;
107                is_dir = true;
108            }
109        }
110        for table in tables.iter() {
111            let file_name = table
112                .path
113                .file_name()
114                .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?
115                .to_str()
116                .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?;
117            let table_name = file_name.split('.').collect_vec()[0];
118            let table_path = table
119                .to_string()
120                .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?;
121
122            if !self.table_exist(table_name) {
123                let table_url = format!("{}/{}", self.authority, table_path);
124
125                let name = TableReference::bare(table_name);
126                let provider = self
127                    .factory
128                    .create(
129                        state,
130                        &CreateExternalTable {
131                            schema: Arc::new(DFSchema::empty()),
132                            name,
133                            location: table_url,
134                            file_type: self.format.clone(),
135                            table_partition_cols: vec![],
136                            if_not_exists: false,
137                            or_replace: false,
138                            temporary: false,
139                            definition: None,
140                            order_exprs: vec![],
141                            unbounded: false,
142                            options: Default::default(),
143                            constraints: Default::default(),
144                            column_defaults: Default::default(),
145                        },
146                    )
147                    .await?;
148                let _ =
149                    self.register_table(table_name.to_string(), Arc::clone(&provider))?;
150            }
151        }
152        Ok(())
153    }
154}
155
156#[async_trait]
157impl SchemaProvider for ListingSchemaProvider {
158    fn as_any(&self) -> &dyn Any {
159        self
160    }
161
162    fn table_names(&self) -> Vec<String> {
163        self.tables
164            .lock()
165            .expect("Can't lock tables")
166            .keys()
167            .map(|it| it.to_string())
168            .collect()
169    }
170
171    async fn table(
172        &self,
173        name: &str,
174    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
175        Ok(self
176            .tables
177            .lock()
178            .expect("Can't lock tables")
179            .get(name)
180            .cloned())
181    }
182
183    fn register_table(
184        &self,
185        name: String,
186        table: Arc<dyn TableProvider>,
187    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
188        self.tables
189            .lock()
190            .expect("Can't lock tables")
191            .insert(name, Arc::clone(&table));
192        Ok(Some(table))
193    }
194
195    fn deregister_table(
196        &self,
197        name: &str,
198    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
199        Ok(self.tables.lock().expect("Can't lock tables").remove(name))
200    }
201
202    fn table_exist(&self, name: &str) -> bool {
203        self.tables
204            .lock()
205            .expect("Can't lock tables")
206            .contains_key(name)
207    }
208}
209
210/// Stores metadata along with a table's path.
211/// Primarily whether the path is a directory or not.
212#[derive(Eq, PartialEq, Hash, Debug)]
213struct TablePath<'a> {
214    path: &'a Path,
215    is_dir: bool,
216}
217
218impl TablePath<'_> {
219    /// Format the path with a '/' appended if its a directory.
220    /// Clients (eg. object_store listing) can and will use the presence of trailing slash as a heuristic
221    fn to_string(&self) -> Option<String> {
222        self.path.to_str().map(|path| {
223            if self.is_dir {
224                format!("{path}/")
225            } else {
226                path.to_string()
227            }
228        })
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[test]
237    fn table_path_ends_with_slash_when_is_dir() {
238        let table_path = TablePath {
239            path: Path::new("/file"),
240            is_dir: true,
241        };
242        assert!(table_path.to_string().expect("table path").ends_with('/'));
243    }
244
245    #[test]
246    fn dir_table_path_str_does_not_end_with_slash_when_not_is_dir() {
247        let table_path = TablePath {
248            path: Path::new("/file"),
249            is_dir: false,
250        };
251        assert!(!table_path.to_string().expect("table_path").ends_with('/'));
252    }
253}