1use std::sync::Arc;
19
20use async_trait::async_trait;
21use datafusion_common::{error::Result, not_impl_err, HashMap, TableReference};
22use datafusion_execution::config::SessionConfig;
23
24use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
25
26#[derive(Debug)]
30struct ResolvedSchemaProvider {
31 owner_name: Option<String>,
32 cached_tables: HashMap<String, Arc<dyn TableProvider>>,
33}
34#[async_trait]
35impl SchemaProvider for ResolvedSchemaProvider {
36 fn owner_name(&self) -> Option<&str> {
37 self.owner_name.as_deref()
38 }
39
40 fn as_any(&self) -> &dyn std::any::Any {
41 self
42 }
43
44 fn table_names(&self) -> Vec<String> {
45 self.cached_tables.keys().cloned().collect()
46 }
47
48 async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
49 Ok(self.cached_tables.get(name).cloned())
50 }
51
52 fn register_table(
53 &self,
54 name: String,
55 _table: Arc<dyn TableProvider>,
56 ) -> Result<Option<Arc<dyn TableProvider>>> {
57 not_impl_err!(
58 "Attempt to register table '{name}' with ResolvedSchemaProvider which is not supported"
59 )
60 }
61
62 fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
63 not_impl_err!("Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported")
64 }
65
66 fn table_exist(&self, name: &str) -> bool {
67 self.cached_tables.contains_key(name)
68 }
69}
70
71struct ResolvedSchemaProviderBuilder {
73 owner_name: String,
74 async_provider: Arc<dyn AsyncSchemaProvider>,
75 cached_tables: HashMap<String, Option<Arc<dyn TableProvider>>>,
76}
77impl ResolvedSchemaProviderBuilder {
78 fn new(owner_name: String, async_provider: Arc<dyn AsyncSchemaProvider>) -> Self {
79 Self {
80 owner_name,
81 async_provider,
82 cached_tables: HashMap::new(),
83 }
84 }
85
86 async fn resolve_table(&mut self, table_name: &str) -> Result<()> {
87 if !self.cached_tables.contains_key(table_name) {
88 let resolved_table = self.async_provider.table(table_name).await?;
89 self.cached_tables
90 .insert(table_name.to_string(), resolved_table);
91 }
92 Ok(())
93 }
94
95 fn finish(self) -> Arc<dyn SchemaProvider> {
96 let cached_tables = self
97 .cached_tables
98 .into_iter()
99 .filter_map(|(key, maybe_value)| maybe_value.map(|value| (key, value)))
100 .collect();
101 Arc::new(ResolvedSchemaProvider {
102 owner_name: Some(self.owner_name),
103 cached_tables,
104 })
105 }
106}
107
108#[derive(Debug)]
112struct ResolvedCatalogProvider {
113 cached_schemas: HashMap<String, Arc<dyn SchemaProvider>>,
114}
115impl CatalogProvider for ResolvedCatalogProvider {
116 fn as_any(&self) -> &dyn std::any::Any {
117 self
118 }
119
120 fn schema_names(&self) -> Vec<String> {
121 self.cached_schemas.keys().cloned().collect()
122 }
123
124 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
125 self.cached_schemas.get(name).cloned()
126 }
127}
128
129struct ResolvedCatalogProviderBuilder {
131 cached_schemas: HashMap<String, Option<ResolvedSchemaProviderBuilder>>,
132 async_provider: Arc<dyn AsyncCatalogProvider>,
133}
134impl ResolvedCatalogProviderBuilder {
135 fn new(async_provider: Arc<dyn AsyncCatalogProvider>) -> Self {
136 Self {
137 cached_schemas: HashMap::new(),
138 async_provider,
139 }
140 }
141 fn finish(self) -> Arc<dyn CatalogProvider> {
142 let cached_schemas = self
143 .cached_schemas
144 .into_iter()
145 .filter_map(|(key, maybe_value)| {
146 maybe_value.map(|value| (key, value.finish()))
147 })
148 .collect();
149 Arc::new(ResolvedCatalogProvider { cached_schemas })
150 }
151}
152
153#[derive(Debug)]
157struct ResolvedCatalogProviderList {
158 cached_catalogs: HashMap<String, Arc<dyn CatalogProvider>>,
159}
160impl CatalogProviderList for ResolvedCatalogProviderList {
161 fn as_any(&self) -> &dyn std::any::Any {
162 self
163 }
164
165 fn register_catalog(
166 &self,
167 _name: String,
168 _catalog: Arc<dyn CatalogProvider>,
169 ) -> Option<Arc<dyn CatalogProvider>> {
170 unimplemented!("resolved providers cannot handle registration APIs")
171 }
172
173 fn catalog_names(&self) -> Vec<String> {
174 self.cached_catalogs.keys().cloned().collect()
175 }
176
177 fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
178 self.cached_catalogs.get(name).cloned()
179 }
180}
181
182#[async_trait]
198pub trait AsyncSchemaProvider: Send + Sync {
199 async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>>;
201 async fn resolve(
212 &self,
213 references: &[TableReference],
214 config: &SessionConfig,
215 catalog_name: &str,
216 schema_name: &str,
217 ) -> Result<Arc<dyn SchemaProvider>> {
218 let mut cached_tables = HashMap::<String, Option<Arc<dyn TableProvider>>>::new();
219
220 for reference in references {
221 let ref_catalog_name = reference
222 .catalog()
223 .unwrap_or(&config.options().catalog.default_catalog);
224
225 if ref_catalog_name != catalog_name {
227 continue;
228 }
229
230 let ref_schema_name = reference
231 .schema()
232 .unwrap_or(&config.options().catalog.default_schema);
233
234 if ref_schema_name != schema_name {
235 continue;
236 }
237
238 if !cached_tables.contains_key(reference.table()) {
239 let resolved_table = self.table(reference.table()).await?;
240 cached_tables.insert(reference.table().to_string(), resolved_table);
241 }
242 }
243
244 let cached_tables = cached_tables
245 .into_iter()
246 .filter_map(|(key, maybe_value)| maybe_value.map(|value| (key, value)))
247 .collect();
248
249 Ok(Arc::new(ResolvedSchemaProvider {
250 cached_tables,
251 owner_name: Some(catalog_name.to_string()),
252 }))
253 }
254}
255
256#[async_trait]
263pub trait AsyncCatalogProvider: Send + Sync {
264 async fn schema(&self, name: &str) -> Result<Option<Arc<dyn AsyncSchemaProvider>>>;
266
267 async fn resolve(
277 &self,
278 references: &[TableReference],
279 config: &SessionConfig,
280 catalog_name: &str,
281 ) -> Result<Arc<dyn CatalogProvider>> {
282 let mut cached_schemas =
283 HashMap::<String, Option<ResolvedSchemaProviderBuilder>>::new();
284
285 for reference in references {
286 let ref_catalog_name = reference
287 .catalog()
288 .unwrap_or(&config.options().catalog.default_catalog);
289
290 if ref_catalog_name != catalog_name {
292 continue;
293 }
294
295 let schema_name = reference
296 .schema()
297 .unwrap_or(&config.options().catalog.default_schema);
298
299 let schema = if let Some(schema) = cached_schemas.get_mut(schema_name) {
300 schema
301 } else {
302 let resolved_schema = self.schema(schema_name).await?;
303 let resolved_schema = resolved_schema.map(|resolved_schema| {
304 ResolvedSchemaProviderBuilder::new(
305 catalog_name.to_string(),
306 resolved_schema,
307 )
308 });
309 cached_schemas.insert(schema_name.to_string(), resolved_schema);
310 cached_schemas.get_mut(schema_name).unwrap()
311 };
312
313 let Some(schema) = schema else { continue };
315
316 schema.resolve_table(reference.table()).await?;
317 }
318
319 let cached_schemas = cached_schemas
320 .into_iter()
321 .filter_map(|(key, maybe_builder)| {
322 maybe_builder.map(|schema_builder| (key, schema_builder.finish()))
323 })
324 .collect::<HashMap<_, _>>();
325
326 Ok(Arc::new(ResolvedCatalogProvider { cached_schemas }))
327 }
328}
329
330#[async_trait]
336pub trait AsyncCatalogProviderList: Send + Sync {
337 async fn catalog(&self, name: &str) -> Result<Option<Arc<dyn AsyncCatalogProvider>>>;
339
340 async fn resolve(
350 &self,
351 references: &[TableReference],
352 config: &SessionConfig,
353 ) -> Result<Arc<dyn CatalogProviderList>> {
354 let mut cached_catalogs =
355 HashMap::<String, Option<ResolvedCatalogProviderBuilder>>::new();
356
357 for reference in references {
358 let catalog_name = reference
359 .catalog()
360 .unwrap_or(&config.options().catalog.default_catalog);
361
362 let catalog = if let Some(catalog) = cached_catalogs.get_mut(catalog_name) {
372 catalog
373 } else {
374 let resolved_catalog = self.catalog(catalog_name).await?;
375 let resolved_catalog =
376 resolved_catalog.map(ResolvedCatalogProviderBuilder::new);
377 cached_catalogs.insert(catalog_name.to_string(), resolved_catalog);
378 cached_catalogs.get_mut(catalog_name).unwrap()
379 };
380
381 let Some(catalog) = catalog else { continue };
383
384 let schema_name = reference
385 .schema()
386 .unwrap_or(&config.options().catalog.default_schema);
387
388 let schema = if let Some(schema) = catalog.cached_schemas.get_mut(schema_name)
389 {
390 schema
391 } else {
392 let resolved_schema = catalog.async_provider.schema(schema_name).await?;
393 let resolved_schema = resolved_schema.map(|async_schema| {
394 ResolvedSchemaProviderBuilder::new(
395 catalog_name.to_string(),
396 async_schema,
397 )
398 });
399 catalog
400 .cached_schemas
401 .insert(schema_name.to_string(), resolved_schema);
402 catalog.cached_schemas.get_mut(schema_name).unwrap()
403 };
404
405 let Some(schema) = schema else { continue };
407
408 schema.resolve_table(reference.table()).await?;
409 }
410
411 let cached_catalogs = cached_catalogs
413 .into_iter()
414 .filter_map(|(key, maybe_builder)| {
415 maybe_builder.map(|catalog_builder| (key, catalog_builder.finish()))
416 })
417 .collect::<HashMap<_, _>>();
418
419 Ok(Arc::new(ResolvedCatalogProviderList { cached_catalogs }))
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use std::{
426 any::Any,
427 sync::{
428 atomic::{AtomicU32, Ordering},
429 Arc,
430 },
431 };
432
433 use arrow::datatypes::SchemaRef;
434 use async_trait::async_trait;
435 use datafusion_common::{error::Result, Statistics, TableReference};
436 use datafusion_execution::config::SessionConfig;
437 use datafusion_expr::{Expr, TableType};
438 use datafusion_physical_plan::ExecutionPlan;
439
440 use crate::{Session, TableProvider};
441
442 use super::{AsyncCatalogProvider, AsyncCatalogProviderList, AsyncSchemaProvider};
443
444 #[derive(Debug)]
445 struct MockTableProvider {}
446 #[async_trait]
447 impl TableProvider for MockTableProvider {
448 fn as_any(&self) -> &dyn Any {
449 self
450 }
451
452 fn schema(&self) -> SchemaRef {
454 unimplemented!()
455 }
456
457 fn table_type(&self) -> TableType {
458 unimplemented!()
459 }
460
461 async fn scan(
462 &self,
463 _state: &dyn Session,
464 _projection: Option<&Vec<usize>>,
465 _filters: &[Expr],
466 _limit: Option<usize>,
467 ) -> Result<Arc<dyn ExecutionPlan>> {
468 unimplemented!()
469 }
470
471 fn statistics(&self) -> Option<Statistics> {
472 unimplemented!()
473 }
474 }
475
476 #[derive(Default)]
477 struct MockAsyncSchemaProvider {
478 lookup_count: AtomicU32,
479 }
480
481 const MOCK_CATALOG: &str = "mock_catalog";
482 const MOCK_SCHEMA: &str = "mock_schema";
483 const MOCK_TABLE: &str = "mock_table";
484
485 #[async_trait]
486 impl AsyncSchemaProvider for MockAsyncSchemaProvider {
487 async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
488 self.lookup_count.fetch_add(1, Ordering::Release);
489 if name == MOCK_TABLE {
490 Ok(Some(Arc::new(MockTableProvider {})))
491 } else {
492 Ok(None)
493 }
494 }
495 }
496
497 fn test_config() -> SessionConfig {
498 let mut config = SessionConfig::default();
499 config.options_mut().catalog.default_catalog = MOCK_CATALOG.to_string();
500 config.options_mut().catalog.default_schema = MOCK_SCHEMA.to_string();
501 config
502 }
503
504 #[tokio::test]
505 async fn test_async_schema_provider_resolve() {
506 async fn check(
507 refs: Vec<TableReference>,
508 expected_lookup_count: u32,
509 found_tables: &[&str],
510 not_found_tables: &[&str],
511 ) {
512 let async_provider = MockAsyncSchemaProvider::default();
513 let cached_provider = async_provider
514 .resolve(&refs, &test_config(), MOCK_CATALOG, MOCK_SCHEMA)
515 .await
516 .unwrap();
517
518 assert_eq!(
519 async_provider.lookup_count.load(Ordering::Acquire),
520 expected_lookup_count
521 );
522
523 for table_ref in found_tables {
524 let table = cached_provider.table(table_ref).await.unwrap();
525 assert!(table.is_some());
526 }
527
528 for table_ref in not_found_tables {
529 assert!(cached_provider.table(table_ref).await.unwrap().is_none());
530 }
531 }
532
533 check(
535 vec![
536 TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE),
537 TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "not_exists"),
538 ],
539 2,
540 &[MOCK_TABLE],
541 &["not_exists"],
542 )
543 .await;
544
545 check(
547 vec![
548 TableReference::full(MOCK_CATALOG, "foo", MOCK_TABLE),
549 TableReference::full("foo", MOCK_SCHEMA, MOCK_TABLE),
550 ],
551 0,
552 &[],
553 &[MOCK_TABLE],
554 )
555 .await;
556
557 check(
559 vec![
560 TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE),
561 TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE),
562 TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "not_exists"),
563 TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "not_exists"),
564 ],
565 2,
566 &[MOCK_TABLE],
567 &["not_exists"],
568 )
569 .await;
570 }
571
572 #[derive(Default)]
573 struct MockAsyncCatalogProvider {
574 lookup_count: AtomicU32,
575 }
576
577 #[async_trait]
578 impl AsyncCatalogProvider for MockAsyncCatalogProvider {
579 async fn schema(
580 &self,
581 name: &str,
582 ) -> Result<Option<Arc<dyn AsyncSchemaProvider>>> {
583 self.lookup_count.fetch_add(1, Ordering::Release);
584 if name == MOCK_SCHEMA {
585 Ok(Some(Arc::new(MockAsyncSchemaProvider::default())))
586 } else {
587 Ok(None)
588 }
589 }
590 }
591
592 #[tokio::test]
593 async fn test_async_catalog_provider_resolve() {
594 async fn check(
595 refs: Vec<TableReference>,
596 expected_lookup_count: u32,
597 found_schemas: &[&str],
598 not_found_schemas: &[&str],
599 ) {
600 let async_provider = MockAsyncCatalogProvider::default();
601 let cached_provider = async_provider
602 .resolve(&refs, &test_config(), MOCK_CATALOG)
603 .await
604 .unwrap();
605
606 assert_eq!(
607 async_provider.lookup_count.load(Ordering::Acquire),
608 expected_lookup_count
609 );
610
611 for schema_ref in found_schemas {
612 let schema = cached_provider.schema(schema_ref);
613 assert!(schema.is_some());
614 }
615
616 for schema_ref in not_found_schemas {
617 assert!(cached_provider.schema(schema_ref).is_none());
618 }
619 }
620
621 check(
623 vec![
624 TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "x"),
625 TableReference::full(MOCK_CATALOG, "not_exists", "x"),
626 ],
627 2,
628 &[MOCK_SCHEMA],
629 &["not_exists"],
630 )
631 .await;
632
633 check(
635 vec![TableReference::full("foo", MOCK_SCHEMA, "x")],
636 0,
637 &[],
638 &[MOCK_SCHEMA],
639 )
640 .await;
641
642 check(
644 vec![
645 TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "x"),
646 TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "x"),
647 TableReference::full(MOCK_CATALOG, "not_exists", "x"),
648 TableReference::full(MOCK_CATALOG, "not_exists", "x"),
649 ],
650 2,
651 &[MOCK_SCHEMA],
652 &["not_exists"],
653 )
654 .await;
655 }
656
657 #[derive(Default)]
658 struct MockAsyncCatalogProviderList {
659 lookup_count: AtomicU32,
660 }
661
662 #[async_trait]
663 impl AsyncCatalogProviderList for MockAsyncCatalogProviderList {
664 async fn catalog(
665 &self,
666 name: &str,
667 ) -> Result<Option<Arc<dyn AsyncCatalogProvider>>> {
668 self.lookup_count.fetch_add(1, Ordering::Release);
669 if name == MOCK_CATALOG {
670 Ok(Some(Arc::new(MockAsyncCatalogProvider::default())))
671 } else {
672 Ok(None)
673 }
674 }
675 }
676
677 #[tokio::test]
678 async fn test_async_catalog_provider_list_resolve() {
679 async fn check(
680 refs: Vec<TableReference>,
681 expected_lookup_count: u32,
682 found_catalogs: &[&str],
683 not_found_catalogs: &[&str],
684 ) {
685 let async_provider = MockAsyncCatalogProviderList::default();
686 let cached_provider =
687 async_provider.resolve(&refs, &test_config()).await.unwrap();
688
689 assert_eq!(
690 async_provider.lookup_count.load(Ordering::Acquire),
691 expected_lookup_count
692 );
693
694 for catalog_ref in found_catalogs {
695 let catalog = cached_provider.catalog(catalog_ref);
696 assert!(catalog.is_some());
697 }
698
699 for catalog_ref in not_found_catalogs {
700 assert!(cached_provider.catalog(catalog_ref).is_none());
701 }
702 }
703
704 check(
706 vec![
707 TableReference::full(MOCK_CATALOG, "x", "x"),
708 TableReference::full("not_exists", "x", "x"),
709 ],
710 2,
711 &[MOCK_CATALOG],
712 &["not_exists"],
713 )
714 .await;
715
716 check(
718 vec![
719 TableReference::full(MOCK_CATALOG, "x", "x"),
720 TableReference::full(MOCK_CATALOG, "x", "x"),
721 TableReference::full("not_exists", "x", "x"),
722 TableReference::full("not_exists", "x", "x"),
723 ],
724 2,
725 &[MOCK_CATALOG],
726 &["not_exists"],
727 )
728 .await;
729 }
730
731 #[tokio::test]
732 async fn test_defaults() {
733 for table_ref in &[
734 TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE),
735 TableReference::partial(MOCK_SCHEMA, MOCK_TABLE),
736 TableReference::bare(MOCK_TABLE),
737 ] {
738 let async_provider = MockAsyncCatalogProviderList::default();
739 let cached_provider = async_provider
740 .resolve(std::slice::from_ref(table_ref), &test_config())
741 .await
742 .unwrap();
743
744 let catalog = cached_provider
745 .catalog(table_ref.catalog().unwrap_or(MOCK_CATALOG))
746 .unwrap();
747 let schema = catalog
748 .schema(table_ref.schema().unwrap_or(MOCK_SCHEMA))
749 .unwrap();
750 assert!(schema.table(table_ref.table()).await.unwrap().is_some());
751 }
752 }
753}