datafusion_catalog/
async.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use async_trait::async_trait;
21use datafusion_common::{error::Result, not_impl_err, HashMap, TableReference};
22use datafusion_execution::config::SessionConfig;
23
24use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
25
26/// A schema provider that looks up tables in a cache
27///
28/// Instances are created by the [`AsyncSchemaProvider::resolve`] method
29#[derive(Debug)]
30struct ResolvedSchemaProvider {
31    owner_name: Option<String>,
32    cached_tables: HashMap<String, Arc<dyn TableProvider>>,
33}
34#[async_trait]
35impl SchemaProvider for ResolvedSchemaProvider {
36    fn owner_name(&self) -> Option<&str> {
37        self.owner_name.as_deref()
38    }
39
40    fn as_any(&self) -> &dyn std::any::Any {
41        self
42    }
43
44    fn table_names(&self) -> Vec<String> {
45        self.cached_tables.keys().cloned().collect()
46    }
47
48    async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
49        Ok(self.cached_tables.get(name).cloned())
50    }
51
52    fn register_table(
53        &self,
54        name: String,
55        _table: Arc<dyn TableProvider>,
56    ) -> Result<Option<Arc<dyn TableProvider>>> {
57        not_impl_err!(
58            "Attempt to register table '{name}' with ResolvedSchemaProvider which is not supported"
59        )
60    }
61
62    fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
63        not_impl_err!("Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported")
64    }
65
66    fn table_exist(&self, name: &str) -> bool {
67        self.cached_tables.contains_key(name)
68    }
69}
70
71/// Helper class for building a [`ResolvedSchemaProvider`]
72struct ResolvedSchemaProviderBuilder {
73    owner_name: String,
74    async_provider: Arc<dyn AsyncSchemaProvider>,
75    cached_tables: HashMap<String, Option<Arc<dyn TableProvider>>>,
76}
77impl ResolvedSchemaProviderBuilder {
78    fn new(owner_name: String, async_provider: Arc<dyn AsyncSchemaProvider>) -> Self {
79        Self {
80            owner_name,
81            async_provider,
82            cached_tables: HashMap::new(),
83        }
84    }
85
86    async fn resolve_table(&mut self, table_name: &str) -> Result<()> {
87        if !self.cached_tables.contains_key(table_name) {
88            let resolved_table = self.async_provider.table(table_name).await?;
89            self.cached_tables
90                .insert(table_name.to_string(), resolved_table);
91        }
92        Ok(())
93    }
94
95    fn finish(self) -> Arc<dyn SchemaProvider> {
96        let cached_tables = self
97            .cached_tables
98            .into_iter()
99            .filter_map(|(key, maybe_value)| maybe_value.map(|value| (key, value)))
100            .collect();
101        Arc::new(ResolvedSchemaProvider {
102            owner_name: Some(self.owner_name),
103            cached_tables,
104        })
105    }
106}
107
108/// A catalog provider that looks up schemas in a cache
109///
110/// Instances are created by the [`AsyncCatalogProvider::resolve`] method
111#[derive(Debug)]
112struct ResolvedCatalogProvider {
113    cached_schemas: HashMap<String, Arc<dyn SchemaProvider>>,
114}
115impl CatalogProvider for ResolvedCatalogProvider {
116    fn as_any(&self) -> &dyn std::any::Any {
117        self
118    }
119
120    fn schema_names(&self) -> Vec<String> {
121        self.cached_schemas.keys().cloned().collect()
122    }
123
124    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
125        self.cached_schemas.get(name).cloned()
126    }
127}
128
129/// Helper class for building a [`ResolvedCatalogProvider`]
130struct ResolvedCatalogProviderBuilder {
131    cached_schemas: HashMap<String, Option<ResolvedSchemaProviderBuilder>>,
132    async_provider: Arc<dyn AsyncCatalogProvider>,
133}
134impl ResolvedCatalogProviderBuilder {
135    fn new(async_provider: Arc<dyn AsyncCatalogProvider>) -> Self {
136        Self {
137            cached_schemas: HashMap::new(),
138            async_provider,
139        }
140    }
141    fn finish(self) -> Arc<dyn CatalogProvider> {
142        let cached_schemas = self
143            .cached_schemas
144            .into_iter()
145            .filter_map(|(key, maybe_value)| {
146                maybe_value.map(|value| (key, value.finish()))
147            })
148            .collect();
149        Arc::new(ResolvedCatalogProvider { cached_schemas })
150    }
151}
152
153/// A catalog provider list that looks up catalogs in a cache
154///
155/// Instances are created by the [`AsyncCatalogProviderList::resolve`] method
156#[derive(Debug)]
157struct ResolvedCatalogProviderList {
158    cached_catalogs: HashMap<String, Arc<dyn CatalogProvider>>,
159}
160impl CatalogProviderList for ResolvedCatalogProviderList {
161    fn as_any(&self) -> &dyn std::any::Any {
162        self
163    }
164
165    fn register_catalog(
166        &self,
167        _name: String,
168        _catalog: Arc<dyn CatalogProvider>,
169    ) -> Option<Arc<dyn CatalogProvider>> {
170        unimplemented!("resolved providers cannot handle registration APIs")
171    }
172
173    fn catalog_names(&self) -> Vec<String> {
174        self.cached_catalogs.keys().cloned().collect()
175    }
176
177    fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
178        self.cached_catalogs.get(name).cloned()
179    }
180}
181
182/// A trait for schema providers that must resolve tables asynchronously
183///
184/// The [`SchemaProvider::table`] method _is_ asynchronous.  However, this is primarily for convenience and
185/// it is not a good idea for this method to be slow as this will cause poor planning performance.
186///
187/// It is a better idea to resolve the tables once and cache them in memory for the duration of
188/// planning.  This trait helps implement that pattern.
189///
190/// After implementing this trait you can call the [`AsyncSchemaProvider::resolve`] method to get an
191/// `Arc<dyn SchemaProvider>` that contains a cached copy of the referenced tables.  The `resolve`
192/// method can be slow and asynchronous as it is only called once, before planning.
193///
194/// See the [remote_catalog.rs] for an end to end example
195///
196/// [remote_catalog.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/remote_catalog.rs
197#[async_trait]
198pub trait AsyncSchemaProvider: Send + Sync {
199    /// Lookup a table in the schema provider
200    async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>>;
201    /// Creates a cached provider that can be used to execute a query containing given references
202    ///
203    /// This method will walk through the references and look them up once, creating a cache of table
204    /// providers.  This cache will be returned as a synchronous TableProvider that can be used to plan
205    /// and execute a query containing the given references.
206    ///
207    /// This cache is intended to be short-lived for the execution of a single query.  There is no mechanism
208    /// for refresh or eviction of stale entries.
209    ///
210    /// See the [`AsyncSchemaProvider`] documentation for additional details
211    async fn resolve(
212        &self,
213        references: &[TableReference],
214        config: &SessionConfig,
215        catalog_name: &str,
216        schema_name: &str,
217    ) -> Result<Arc<dyn SchemaProvider>> {
218        let mut cached_tables = HashMap::<String, Option<Arc<dyn TableProvider>>>::new();
219
220        for reference in references {
221            let ref_catalog_name = reference
222                .catalog()
223                .unwrap_or(&config.options().catalog.default_catalog);
224
225            // Maybe this is a reference to some other catalog provided in another way
226            if ref_catalog_name != catalog_name {
227                continue;
228            }
229
230            let ref_schema_name = reference
231                .schema()
232                .unwrap_or(&config.options().catalog.default_schema);
233
234            if ref_schema_name != schema_name {
235                continue;
236            }
237
238            if !cached_tables.contains_key(reference.table()) {
239                let resolved_table = self.table(reference.table()).await?;
240                cached_tables.insert(reference.table().to_string(), resolved_table);
241            }
242        }
243
244        let cached_tables = cached_tables
245            .into_iter()
246            .filter_map(|(key, maybe_value)| maybe_value.map(|value| (key, value)))
247            .collect();
248
249        Ok(Arc::new(ResolvedSchemaProvider {
250            cached_tables,
251            owner_name: Some(catalog_name.to_string()),
252        }))
253    }
254}
255
256/// A trait for catalog providers that must resolve schemas asynchronously
257///
258/// The [`CatalogProvider::schema`] method is synchronous because asynchronous operations should
259/// not be used during planning.  This trait makes it easy to lookup schema references once and cache
260/// them for future planning use.  See [`AsyncSchemaProvider`] for more details on motivation.
261
262#[async_trait]
263pub trait AsyncCatalogProvider: Send + Sync {
264    /// Lookup a schema in the provider
265    async fn schema(&self, name: &str) -> Result<Option<Arc<dyn AsyncSchemaProvider>>>;
266
267    /// Creates a cached provider that can be used to execute a query containing given references
268    ///
269    /// This method will walk through the references and look them up once, creating a cache of schema
270    /// providers (each with their own cache of table providers).  This cache will be returned as a
271    /// synchronous CatalogProvider that can be used to plan and execute a query containing the given
272    /// references.
273    ///
274    /// This cache is intended to be short-lived for the execution of a single query.  There is no mechanism
275    /// for refresh or eviction of stale entries.
276    async fn resolve(
277        &self,
278        references: &[TableReference],
279        config: &SessionConfig,
280        catalog_name: &str,
281    ) -> Result<Arc<dyn CatalogProvider>> {
282        let mut cached_schemas =
283            HashMap::<String, Option<ResolvedSchemaProviderBuilder>>::new();
284
285        for reference in references {
286            let ref_catalog_name = reference
287                .catalog()
288                .unwrap_or(&config.options().catalog.default_catalog);
289
290            // Maybe this is a reference to some other catalog provided in another way
291            if ref_catalog_name != catalog_name {
292                continue;
293            }
294
295            let schema_name = reference
296                .schema()
297                .unwrap_or(&config.options().catalog.default_schema);
298
299            let schema = if let Some(schema) = cached_schemas.get_mut(schema_name) {
300                schema
301            } else {
302                let resolved_schema = self.schema(schema_name).await?;
303                let resolved_schema = resolved_schema.map(|resolved_schema| {
304                    ResolvedSchemaProviderBuilder::new(
305                        catalog_name.to_string(),
306                        resolved_schema,
307                    )
308                });
309                cached_schemas.insert(schema_name.to_string(), resolved_schema);
310                cached_schemas.get_mut(schema_name).unwrap()
311            };
312
313            // If we can't find the catalog don't bother checking the table
314            let Some(schema) = schema else { continue };
315
316            schema.resolve_table(reference.table()).await?;
317        }
318
319        let cached_schemas = cached_schemas
320            .into_iter()
321            .filter_map(|(key, maybe_builder)| {
322                maybe_builder.map(|schema_builder| (key, schema_builder.finish()))
323            })
324            .collect::<HashMap<_, _>>();
325
326        Ok(Arc::new(ResolvedCatalogProvider { cached_schemas }))
327    }
328}
329
330/// A trait for catalog provider lists that must resolve catalogs asynchronously
331///
332/// The [`CatalogProviderList::catalog`] method is synchronous because asynchronous operations should
333/// not be used during planning.  This trait makes it easy to lookup catalog references once and cache
334/// them for future planning use.  See [`AsyncSchemaProvider`] for more details on motivation.
335#[async_trait]
336pub trait AsyncCatalogProviderList: Send + Sync {
337    /// Lookup a catalog in the provider
338    async fn catalog(&self, name: &str) -> Result<Option<Arc<dyn AsyncCatalogProvider>>>;
339
340    /// Creates a cached provider that can be used to execute a query containing given references
341    ///
342    /// This method will walk through the references and look them up once, creating a cache of catalog
343    /// providers, schema providers, and table providers.  This cache will be returned as a
344    /// synchronous CatalogProvider that can be used to plan and execute a query containing the given
345    /// references.
346    ///
347    /// This cache is intended to be short-lived for the execution of a single query.  There is no mechanism
348    /// for refresh or eviction of stale entries.
349    async fn resolve(
350        &self,
351        references: &[TableReference],
352        config: &SessionConfig,
353    ) -> Result<Arc<dyn CatalogProviderList>> {
354        let mut cached_catalogs =
355            HashMap::<String, Option<ResolvedCatalogProviderBuilder>>::new();
356
357        for reference in references {
358            let catalog_name = reference
359                .catalog()
360                .unwrap_or(&config.options().catalog.default_catalog);
361
362            // We will do three lookups here, one for the catalog, one for the schema, and one for the table
363            // We cache the result (both found results and not-found results) to speed up future lookups
364            //
365            // Note that a cache-miss is not an error at this point.  We allow for the possibility that
366            // other providers may supply the reference.
367            //
368            // If this is the only provider then a not-found error will be raised during planning when it can't
369            // find the reference in the cache.
370
371            let catalog = if let Some(catalog) = cached_catalogs.get_mut(catalog_name) {
372                catalog
373            } else {
374                let resolved_catalog = self.catalog(catalog_name).await?;
375                let resolved_catalog =
376                    resolved_catalog.map(ResolvedCatalogProviderBuilder::new);
377                cached_catalogs.insert(catalog_name.to_string(), resolved_catalog);
378                cached_catalogs.get_mut(catalog_name).unwrap()
379            };
380
381            // If we can't find the catalog don't bother checking the schema / table
382            let Some(catalog) = catalog else { continue };
383
384            let schema_name = reference
385                .schema()
386                .unwrap_or(&config.options().catalog.default_schema);
387
388            let schema = if let Some(schema) = catalog.cached_schemas.get_mut(schema_name)
389            {
390                schema
391            } else {
392                let resolved_schema = catalog.async_provider.schema(schema_name).await?;
393                let resolved_schema = resolved_schema.map(|async_schema| {
394                    ResolvedSchemaProviderBuilder::new(
395                        catalog_name.to_string(),
396                        async_schema,
397                    )
398                });
399                catalog
400                    .cached_schemas
401                    .insert(schema_name.to_string(), resolved_schema);
402                catalog.cached_schemas.get_mut(schema_name).unwrap()
403            };
404
405            // If we can't find the catalog don't bother checking the table
406            let Some(schema) = schema else { continue };
407
408            schema.resolve_table(reference.table()).await?;
409        }
410
411        // Build the cached catalog provider list
412        let cached_catalogs = cached_catalogs
413            .into_iter()
414            .filter_map(|(key, maybe_builder)| {
415                maybe_builder.map(|catalog_builder| (key, catalog_builder.finish()))
416            })
417            .collect::<HashMap<_, _>>();
418
419        Ok(Arc::new(ResolvedCatalogProviderList { cached_catalogs }))
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use std::{
426        any::Any,
427        sync::{
428            atomic::{AtomicU32, Ordering},
429            Arc,
430        },
431    };
432
433    use arrow::datatypes::SchemaRef;
434    use async_trait::async_trait;
435    use datafusion_common::{error::Result, Statistics, TableReference};
436    use datafusion_execution::config::SessionConfig;
437    use datafusion_expr::{Expr, TableType};
438    use datafusion_physical_plan::ExecutionPlan;
439
440    use crate::{Session, TableProvider};
441
442    use super::{AsyncCatalogProvider, AsyncCatalogProviderList, AsyncSchemaProvider};
443
444    #[derive(Debug)]
445    struct MockTableProvider {}
446    #[async_trait]
447    impl TableProvider for MockTableProvider {
448        fn as_any(&self) -> &dyn Any {
449            self
450        }
451
452        /// Get a reference to the schema for this table
453        fn schema(&self) -> SchemaRef {
454            unimplemented!()
455        }
456
457        fn table_type(&self) -> TableType {
458            unimplemented!()
459        }
460
461        async fn scan(
462            &self,
463            _state: &dyn Session,
464            _projection: Option<&Vec<usize>>,
465            _filters: &[Expr],
466            _limit: Option<usize>,
467        ) -> Result<Arc<dyn ExecutionPlan>> {
468            unimplemented!()
469        }
470
471        fn statistics(&self) -> Option<Statistics> {
472            unimplemented!()
473        }
474    }
475
476    #[derive(Default)]
477    struct MockAsyncSchemaProvider {
478        lookup_count: AtomicU32,
479    }
480
481    const MOCK_CATALOG: &str = "mock_catalog";
482    const MOCK_SCHEMA: &str = "mock_schema";
483    const MOCK_TABLE: &str = "mock_table";
484
485    #[async_trait]
486    impl AsyncSchemaProvider for MockAsyncSchemaProvider {
487        async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
488            self.lookup_count.fetch_add(1, Ordering::Release);
489            if name == MOCK_TABLE {
490                Ok(Some(Arc::new(MockTableProvider {})))
491            } else {
492                Ok(None)
493            }
494        }
495    }
496
497    fn test_config() -> SessionConfig {
498        let mut config = SessionConfig::default();
499        config.options_mut().catalog.default_catalog = MOCK_CATALOG.to_string();
500        config.options_mut().catalog.default_schema = MOCK_SCHEMA.to_string();
501        config
502    }
503
504    #[tokio::test]
505    async fn test_async_schema_provider_resolve() {
506        async fn check(
507            refs: Vec<TableReference>,
508            expected_lookup_count: u32,
509            found_tables: &[&str],
510            not_found_tables: &[&str],
511        ) {
512            let async_provider = MockAsyncSchemaProvider::default();
513            let cached_provider = async_provider
514                .resolve(&refs, &test_config(), MOCK_CATALOG, MOCK_SCHEMA)
515                .await
516                .unwrap();
517
518            assert_eq!(
519                async_provider.lookup_count.load(Ordering::Acquire),
520                expected_lookup_count
521            );
522
523            for table_ref in found_tables {
524                let table = cached_provider.table(table_ref).await.unwrap();
525                assert!(table.is_some());
526            }
527
528            for table_ref in not_found_tables {
529                assert!(cached_provider.table(table_ref).await.unwrap().is_none());
530            }
531        }
532
533        // Basic full lookups
534        check(
535            vec![
536                TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE),
537                TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "not_exists"),
538            ],
539            2,
540            &[MOCK_TABLE],
541            &["not_exists"],
542        )
543        .await;
544
545        // Catalog / schema mismatch doesn't even search
546        check(
547            vec![
548                TableReference::full(MOCK_CATALOG, "foo", MOCK_TABLE),
549                TableReference::full("foo", MOCK_SCHEMA, MOCK_TABLE),
550            ],
551            0,
552            &[],
553            &[MOCK_TABLE],
554        )
555        .await;
556
557        // Both hits and misses cached
558        check(
559            vec![
560                TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE),
561                TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE),
562                TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "not_exists"),
563                TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "not_exists"),
564            ],
565            2,
566            &[MOCK_TABLE],
567            &["not_exists"],
568        )
569        .await;
570    }
571
572    #[derive(Default)]
573    struct MockAsyncCatalogProvider {
574        lookup_count: AtomicU32,
575    }
576
577    #[async_trait]
578    impl AsyncCatalogProvider for MockAsyncCatalogProvider {
579        async fn schema(
580            &self,
581            name: &str,
582        ) -> Result<Option<Arc<dyn AsyncSchemaProvider>>> {
583            self.lookup_count.fetch_add(1, Ordering::Release);
584            if name == MOCK_SCHEMA {
585                Ok(Some(Arc::new(MockAsyncSchemaProvider::default())))
586            } else {
587                Ok(None)
588            }
589        }
590    }
591
592    #[tokio::test]
593    async fn test_async_catalog_provider_resolve() {
594        async fn check(
595            refs: Vec<TableReference>,
596            expected_lookup_count: u32,
597            found_schemas: &[&str],
598            not_found_schemas: &[&str],
599        ) {
600            let async_provider = MockAsyncCatalogProvider::default();
601            let cached_provider = async_provider
602                .resolve(&refs, &test_config(), MOCK_CATALOG)
603                .await
604                .unwrap();
605
606            assert_eq!(
607                async_provider.lookup_count.load(Ordering::Acquire),
608                expected_lookup_count
609            );
610
611            for schema_ref in found_schemas {
612                let schema = cached_provider.schema(schema_ref);
613                assert!(schema.is_some());
614            }
615
616            for schema_ref in not_found_schemas {
617                assert!(cached_provider.schema(schema_ref).is_none());
618            }
619        }
620
621        // Basic full lookups
622        check(
623            vec![
624                TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "x"),
625                TableReference::full(MOCK_CATALOG, "not_exists", "x"),
626            ],
627            2,
628            &[MOCK_SCHEMA],
629            &["not_exists"],
630        )
631        .await;
632
633        // Catalog mismatch doesn't even search
634        check(
635            vec![TableReference::full("foo", MOCK_SCHEMA, "x")],
636            0,
637            &[],
638            &[MOCK_SCHEMA],
639        )
640        .await;
641
642        // Both hits and misses cached
643        check(
644            vec![
645                TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "x"),
646                TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "x"),
647                TableReference::full(MOCK_CATALOG, "not_exists", "x"),
648                TableReference::full(MOCK_CATALOG, "not_exists", "x"),
649            ],
650            2,
651            &[MOCK_SCHEMA],
652            &["not_exists"],
653        )
654        .await;
655    }
656
657    #[derive(Default)]
658    struct MockAsyncCatalogProviderList {
659        lookup_count: AtomicU32,
660    }
661
662    #[async_trait]
663    impl AsyncCatalogProviderList for MockAsyncCatalogProviderList {
664        async fn catalog(
665            &self,
666            name: &str,
667        ) -> Result<Option<Arc<dyn AsyncCatalogProvider>>> {
668            self.lookup_count.fetch_add(1, Ordering::Release);
669            if name == MOCK_CATALOG {
670                Ok(Some(Arc::new(MockAsyncCatalogProvider::default())))
671            } else {
672                Ok(None)
673            }
674        }
675    }
676
677    #[tokio::test]
678    async fn test_async_catalog_provider_list_resolve() {
679        async fn check(
680            refs: Vec<TableReference>,
681            expected_lookup_count: u32,
682            found_catalogs: &[&str],
683            not_found_catalogs: &[&str],
684        ) {
685            let async_provider = MockAsyncCatalogProviderList::default();
686            let cached_provider =
687                async_provider.resolve(&refs, &test_config()).await.unwrap();
688
689            assert_eq!(
690                async_provider.lookup_count.load(Ordering::Acquire),
691                expected_lookup_count
692            );
693
694            for catalog_ref in found_catalogs {
695                let catalog = cached_provider.catalog(catalog_ref);
696                assert!(catalog.is_some());
697            }
698
699            for catalog_ref in not_found_catalogs {
700                assert!(cached_provider.catalog(catalog_ref).is_none());
701            }
702        }
703
704        // Basic full lookups
705        check(
706            vec![
707                TableReference::full(MOCK_CATALOG, "x", "x"),
708                TableReference::full("not_exists", "x", "x"),
709            ],
710            2,
711            &[MOCK_CATALOG],
712            &["not_exists"],
713        )
714        .await;
715
716        // Both hits and misses cached
717        check(
718            vec![
719                TableReference::full(MOCK_CATALOG, "x", "x"),
720                TableReference::full(MOCK_CATALOG, "x", "x"),
721                TableReference::full("not_exists", "x", "x"),
722                TableReference::full("not_exists", "x", "x"),
723            ],
724            2,
725            &[MOCK_CATALOG],
726            &["not_exists"],
727        )
728        .await;
729    }
730
731    #[tokio::test]
732    async fn test_defaults() {
733        for table_ref in &[
734            TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE),
735            TableReference::partial(MOCK_SCHEMA, MOCK_TABLE),
736            TableReference::bare(MOCK_TABLE),
737        ] {
738            let async_provider = MockAsyncCatalogProviderList::default();
739            let cached_provider = async_provider
740                .resolve(std::slice::from_ref(table_ref), &test_config())
741                .await
742                .unwrap();
743
744            let catalog = cached_provider
745                .catalog(table_ref.catalog().unwrap_or(MOCK_CATALOG))
746                .unwrap();
747            let schema = catalog
748                .schema(table_ref.schema().unwrap_or(MOCK_SCHEMA))
749                .unwrap();
750            assert!(schema.table(table_ref.table()).await.unwrap().is_some());
751        }
752    }
753}