datafusion/datasource/
dynamic_file.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! dynamic_file_schema contains an [`UrlTableFactory`] implementation that
19//! can create a [`ListingTable`] from the given url.
20
21use std::sync::Arc;
22
23use crate::datasource::listing::ListingTableConfigExt;
24use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl};
25use crate::datasource::TableProvider;
26use crate::error::Result;
27use crate::execution::context::SessionState;
28
29use datafusion_catalog::UrlTableFactory;
30use datafusion_common::plan_datafusion_err;
31use datafusion_session::SessionStore;
32
33use async_trait::async_trait;
34
35/// [DynamicListTableFactory] is a factory that can create a [ListingTable] from the given url.
36#[derive(Default, Debug)]
37pub struct DynamicListTableFactory {
38    /// The session store that contains the current session.
39    session_store: SessionStore,
40}
41
42impl DynamicListTableFactory {
43    /// Create a new [DynamicListTableFactory] with the given state store.
44    pub fn new(session_store: SessionStore) -> Self {
45        Self { session_store }
46    }
47
48    /// Get the session store.
49    pub fn session_store(&self) -> &SessionStore {
50        &self.session_store
51    }
52}
53
54#[async_trait]
55impl UrlTableFactory for DynamicListTableFactory {
56    async fn try_new(&self, url: &str) -> Result<Option<Arc<dyn TableProvider>>> {
57        let Ok(table_url) = ListingTableUrl::parse(url) else {
58            return Ok(None);
59        };
60
61        let state = &self
62            .session_store()
63            .get_session()
64            .upgrade()
65            .and_then(|session| {
66                session
67                    .read()
68                    .as_any()
69                    .downcast_ref::<SessionState>()
70                    .cloned()
71            })
72            .ok_or_else(|| plan_datafusion_err!("get current SessionStore error"))?;
73
74        match ListingTableConfig::new(table_url.clone())
75            .infer_options(state)
76            .await
77        {
78            Ok(cfg) => {
79                let cfg = cfg
80                    .infer_partitions_from_path(state)
81                    .await?
82                    .infer_schema(state)
83                    .await?;
84                ListingTable::try_new(cfg)
85                    .map(|table| Some(Arc::new(table) as Arc<dyn TableProvider>))
86            }
87            Err(_) => Ok(None),
88        }
89    }
90}