datafusion/datasource/dynamic_file.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! dynamic_file_schema contains an [`UrlTableFactory`] implementation that
19//! can create a [`ListingTable`] from the given url.
20
21use std::sync::Arc;
22
23use crate::datasource::listing::ListingTableConfigExt;
24use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl};
25use crate::datasource::TableProvider;
26use crate::error::Result;
27use crate::execution::context::SessionState;
28
29use datafusion_catalog::UrlTableFactory;
30use datafusion_common::plan_datafusion_err;
31use datafusion_session::SessionStore;
32
33use async_trait::async_trait;
34
35/// [DynamicListTableFactory] is a factory that can create a [ListingTable] from the given url.
36#[derive(Default, Debug)]
37pub struct DynamicListTableFactory {
38 /// The session store that contains the current session.
39 session_store: SessionStore,
40}
41
42impl DynamicListTableFactory {
43 /// Create a new [DynamicListTableFactory] with the given state store.
44 pub fn new(session_store: SessionStore) -> Self {
45 Self { session_store }
46 }
47
48 /// Get the session store.
49 pub fn session_store(&self) -> &SessionStore {
50 &self.session_store
51 }
52}
53
54#[async_trait]
55impl UrlTableFactory for DynamicListTableFactory {
56 async fn try_new(&self, url: &str) -> Result<Option<Arc<dyn TableProvider>>> {
57 let Ok(table_url) = ListingTableUrl::parse(url) else {
58 return Ok(None);
59 };
60
61 let state = &self
62 .session_store()
63 .get_session()
64 .upgrade()
65 .and_then(|session| {
66 session
67 .read()
68 .as_any()
69 .downcast_ref::<SessionState>()
70 .cloned()
71 })
72 .ok_or_else(|| plan_datafusion_err!("get current SessionStore error"))?;
73
74 match ListingTableConfig::new(table_url.clone())
75 .infer_options(state)
76 .await
77 {
78 Ok(cfg) => {
79 let cfg = cfg
80 .infer_partitions_from_path(state)
81 .await?
82 .infer_schema(state)
83 .await?;
84 ListingTable::try_new(cfg)
85 .map(|table| Some(Arc::new(table) as Arc<dyn TableProvider>))
86 }
87 Err(_) => Ok(None),
88 }
89 }
90}