datafusion_catalog/
information_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`InformationSchemaProvider`] that implements the SQL [Information Schema] for DataFusion.
19//!
20//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema
21
22use crate::streaming::StreamingTable;
23use crate::{CatalogProviderList, SchemaProvider, TableProvider};
24use arrow::array::builder::{BooleanBuilder, UInt8Builder};
25use arrow::{
26    array::{StringBuilder, UInt64Builder},
27    datatypes::{DataType, Field, Schema, SchemaRef},
28    record_batch::RecordBatch,
29};
30use async_trait::async_trait;
31use datafusion_common::config::{ConfigEntry, ConfigOptions};
32use datafusion_common::error::Result;
33use datafusion_common::types::NativeType;
34use datafusion_common::DataFusionError;
35use datafusion_execution::TaskContext;
36use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
37use datafusion_expr::{TableType, Volatility};
38use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
39use datafusion_physical_plan::streaming::PartitionStream;
40use datafusion_physical_plan::SendableRecordBatchStream;
41use std::collections::{BTreeSet, HashMap, HashSet};
42use std::fmt::Debug;
43use std::{any::Any, sync::Arc};
44
45pub const INFORMATION_SCHEMA: &str = "information_schema";
46pub(crate) const TABLES: &str = "tables";
47pub(crate) const VIEWS: &str = "views";
48pub(crate) const COLUMNS: &str = "columns";
49pub(crate) const DF_SETTINGS: &str = "df_settings";
50pub(crate) const SCHEMATA: &str = "schemata";
51pub(crate) const ROUTINES: &str = "routines";
52pub(crate) const PARAMETERS: &str = "parameters";
53
54/// All information schema tables
55pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
56    TABLES,
57    VIEWS,
58    COLUMNS,
59    DF_SETTINGS,
60    SCHEMATA,
61    ROUTINES,
62    PARAMETERS,
63];
64
65/// Implements the `information_schema` virtual schema and tables
66///
67/// The underlying tables in the `information_schema` are created on
68/// demand. This means that if more tables are added to the underlying
69/// providers, they will appear the next time the `information_schema`
70/// table is queried.
71#[derive(Debug)]
72pub struct InformationSchemaProvider {
73    config: InformationSchemaConfig,
74}
75
76impl InformationSchemaProvider {
77    /// Creates a new [`InformationSchemaProvider`] for the provided `catalog_list`
78    pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
79        Self {
80            config: InformationSchemaConfig { catalog_list },
81        }
82    }
83}
84
85#[derive(Clone, Debug)]
86struct InformationSchemaConfig {
87    catalog_list: Arc<dyn CatalogProviderList>,
88}
89
90impl InformationSchemaConfig {
91    /// Construct the `information_schema.tables` virtual table
92    async fn make_tables(
93        &self,
94        builder: &mut InformationSchemaTablesBuilder,
95    ) -> Result<(), DataFusionError> {
96        // create a mem table with the names of tables
97
98        for catalog_name in self.catalog_list.catalog_names() {
99            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
100
101            for schema_name in catalog.schema_names() {
102                if schema_name != INFORMATION_SCHEMA {
103                    // schema name may not exist in the catalog, so we need to check
104                    if let Some(schema) = catalog.schema(&schema_name) {
105                        for table_name in schema.table_names() {
106                            if let Some(table_type) =
107                                schema.table_type(&table_name).await?
108                            {
109                                builder.add_table(
110                                    &catalog_name,
111                                    &schema_name,
112                                    &table_name,
113                                    table_type,
114                                );
115                            }
116                        }
117                    }
118                }
119            }
120
121            // Add a final list for the information schema tables themselves
122            for table_name in INFORMATION_SCHEMA_TABLES {
123                builder.add_table(
124                    &catalog_name,
125                    INFORMATION_SCHEMA,
126                    table_name,
127                    TableType::View,
128                );
129            }
130        }
131
132        Ok(())
133    }
134
135    async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
136        for catalog_name in self.catalog_list.catalog_names() {
137            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
138
139            for schema_name in catalog.schema_names() {
140                if schema_name != INFORMATION_SCHEMA {
141                    if let Some(schema) = catalog.schema(&schema_name) {
142                        let schema_owner = schema.owner_name();
143                        builder.add_schemata(&catalog_name, &schema_name, schema_owner);
144                    }
145                }
146            }
147        }
148    }
149
150    async fn make_views(
151        &self,
152        builder: &mut InformationSchemaViewBuilder,
153    ) -> Result<(), DataFusionError> {
154        for catalog_name in self.catalog_list.catalog_names() {
155            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
156
157            for schema_name in catalog.schema_names() {
158                if schema_name != INFORMATION_SCHEMA {
159                    // schema name may not exist in the catalog, so we need to check
160                    if let Some(schema) = catalog.schema(&schema_name) {
161                        for table_name in schema.table_names() {
162                            if let Some(table) = schema.table(&table_name).await? {
163                                builder.add_view(
164                                    &catalog_name,
165                                    &schema_name,
166                                    &table_name,
167                                    table.get_table_definition(),
168                                )
169                            }
170                        }
171                    }
172                }
173            }
174        }
175
176        Ok(())
177    }
178
179    /// Construct the `information_schema.columns` virtual table
180    async fn make_columns(
181        &self,
182        builder: &mut InformationSchemaColumnsBuilder,
183    ) -> Result<(), DataFusionError> {
184        for catalog_name in self.catalog_list.catalog_names() {
185            let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
186
187            for schema_name in catalog.schema_names() {
188                if schema_name != INFORMATION_SCHEMA {
189                    // schema name may not exist in the catalog, so we need to check
190                    if let Some(schema) = catalog.schema(&schema_name) {
191                        for table_name in schema.table_names() {
192                            if let Some(table) = schema.table(&table_name).await? {
193                                for (field_position, field) in
194                                    table.schema().fields().iter().enumerate()
195                                {
196                                    builder.add_column(
197                                        &catalog_name,
198                                        &schema_name,
199                                        &table_name,
200                                        field_position,
201                                        field,
202                                    )
203                                }
204                            }
205                        }
206                    }
207                }
208            }
209        }
210
211        Ok(())
212    }
213
214    /// Construct the `information_schema.df_settings` virtual table
215    fn make_df_settings(
216        &self,
217        config_options: &ConfigOptions,
218        builder: &mut InformationSchemaDfSettingsBuilder,
219    ) {
220        for entry in config_options.entries() {
221            builder.add_setting(entry);
222        }
223    }
224
225    fn make_routines(
226        &self,
227        udfs: &HashMap<String, Arc<ScalarUDF>>,
228        udafs: &HashMap<String, Arc<AggregateUDF>>,
229        udwfs: &HashMap<String, Arc<WindowUDF>>,
230        config_options: &ConfigOptions,
231        builder: &mut InformationSchemaRoutinesBuilder,
232    ) -> Result<()> {
233        let catalog_name = &config_options.catalog.default_catalog;
234        let schema_name = &config_options.catalog.default_schema;
235
236        for (name, udf) in udfs {
237            let return_types = get_udf_args_and_return_types(udf)?
238                .into_iter()
239                .map(|(_, return_type)| return_type)
240                .collect::<HashSet<_>>();
241            for return_type in return_types {
242                builder.add_routine(
243                    catalog_name,
244                    schema_name,
245                    name,
246                    "FUNCTION",
247                    Self::is_deterministic(udf.signature()),
248                    return_type,
249                    "SCALAR",
250                    udf.documentation().map(|d| d.description.to_string()),
251                    udf.documentation().map(|d| d.syntax_example.to_string()),
252                )
253            }
254        }
255
256        for (name, udaf) in udafs {
257            let return_types = get_udaf_args_and_return_types(udaf)?
258                .into_iter()
259                .map(|(_, return_type)| return_type)
260                .collect::<HashSet<_>>();
261            for return_type in return_types {
262                builder.add_routine(
263                    catalog_name,
264                    schema_name,
265                    name,
266                    "FUNCTION",
267                    Self::is_deterministic(udaf.signature()),
268                    return_type,
269                    "AGGREGATE",
270                    udaf.documentation().map(|d| d.description.to_string()),
271                    udaf.documentation().map(|d| d.syntax_example.to_string()),
272                )
273            }
274        }
275
276        for (name, udwf) in udwfs {
277            let return_types = get_udwf_args_and_return_types(udwf)?
278                .into_iter()
279                .map(|(_, return_type)| return_type)
280                .collect::<HashSet<_>>();
281            for return_type in return_types {
282                builder.add_routine(
283                    catalog_name,
284                    schema_name,
285                    name,
286                    "FUNCTION",
287                    Self::is_deterministic(udwf.signature()),
288                    return_type,
289                    "WINDOW",
290                    udwf.documentation().map(|d| d.description.to_string()),
291                    udwf.documentation().map(|d| d.syntax_example.to_string()),
292                )
293            }
294        }
295        Ok(())
296    }
297
298    fn is_deterministic(signature: &Signature) -> bool {
299        signature.volatility == Volatility::Immutable
300    }
301    fn make_parameters(
302        &self,
303        udfs: &HashMap<String, Arc<ScalarUDF>>,
304        udafs: &HashMap<String, Arc<AggregateUDF>>,
305        udwfs: &HashMap<String, Arc<WindowUDF>>,
306        config_options: &ConfigOptions,
307        builder: &mut InformationSchemaParametersBuilder,
308    ) -> Result<()> {
309        let catalog_name = &config_options.catalog.default_catalog;
310        let schema_name = &config_options.catalog.default_schema;
311        let mut add_parameters = |func_name: &str,
312                                  args: Option<&Vec<(String, String)>>,
313                                  arg_types: Vec<String>,
314                                  return_type: Option<String>,
315                                  is_variadic: bool,
316                                  rid: u8| {
317            for (position, type_name) in arg_types.iter().enumerate() {
318                let param_name =
319                    args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
320                builder.add_parameter(
321                    catalog_name,
322                    schema_name,
323                    func_name,
324                    position as u64 + 1,
325                    "IN",
326                    param_name,
327                    type_name,
328                    None::<&str>,
329                    is_variadic,
330                    rid,
331                );
332            }
333            if let Some(return_type) = return_type {
334                builder.add_parameter(
335                    catalog_name,
336                    schema_name,
337                    func_name,
338                    1,
339                    "OUT",
340                    None::<&str>,
341                    return_type.as_str(),
342                    None::<&str>,
343                    false,
344                    rid,
345                );
346            }
347        };
348
349        for (func_name, udf) in udfs {
350            let args = udf.documentation().and_then(|d| d.arguments.clone());
351            let combinations = get_udf_args_and_return_types(udf)?;
352            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
353                add_parameters(
354                    func_name,
355                    args.as_ref(),
356                    arg_types,
357                    return_type,
358                    Self::is_variadic(udf.signature()),
359                    rid as u8,
360                );
361            }
362        }
363
364        for (func_name, udaf) in udafs {
365            let args = udaf.documentation().and_then(|d| d.arguments.clone());
366            let combinations = get_udaf_args_and_return_types(udaf)?;
367            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
368                add_parameters(
369                    func_name,
370                    args.as_ref(),
371                    arg_types,
372                    return_type,
373                    Self::is_variadic(udaf.signature()),
374                    rid as u8,
375                );
376            }
377        }
378
379        for (func_name, udwf) in udwfs {
380            let args = udwf.documentation().and_then(|d| d.arguments.clone());
381            let combinations = get_udwf_args_and_return_types(udwf)?;
382            for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
383                add_parameters(
384                    func_name,
385                    args.as_ref(),
386                    arg_types,
387                    return_type,
388                    Self::is_variadic(udwf.signature()),
389                    rid as u8,
390                );
391            }
392        }
393
394        Ok(())
395    }
396
397    fn is_variadic(signature: &Signature) -> bool {
398        matches!(
399            signature.type_signature,
400            TypeSignature::Variadic(_) | TypeSignature::VariadicAny
401        )
402    }
403}
404
405/// get the arguments and return types of a UDF
406/// returns a tuple of (arg_types, return_type)
407fn get_udf_args_and_return_types(
408    udf: &Arc<ScalarUDF>,
409) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
410    let signature = udf.signature();
411    let arg_types = signature.type_signature.get_example_types();
412    if arg_types.is_empty() {
413        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
414    } else {
415        Ok(arg_types
416            .into_iter()
417            .map(|arg_types| {
418                // only handle the function which implemented [`ScalarUDFImpl::return_type`] method
419                let return_type = udf
420                    .return_type(&arg_types)
421                    .map(|t| remove_native_type_prefix(NativeType::from(t)))
422                    .ok();
423                let arg_types = arg_types
424                    .into_iter()
425                    .map(|t| remove_native_type_prefix(NativeType::from(t)))
426                    .collect::<Vec<_>>();
427                (arg_types, return_type)
428            })
429            .collect::<BTreeSet<_>>())
430    }
431}
432
433fn get_udaf_args_and_return_types(
434    udaf: &Arc<AggregateUDF>,
435) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
436    let signature = udaf.signature();
437    let arg_types = signature.type_signature.get_example_types();
438    if arg_types.is_empty() {
439        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
440    } else {
441        Ok(arg_types
442            .into_iter()
443            .map(|arg_types| {
444                // only handle the function which implemented [`ScalarUDFImpl::return_type`] method
445                let return_type = udaf
446                    .return_type(&arg_types)
447                    .ok()
448                    .map(|t| remove_native_type_prefix(NativeType::from(t)));
449                let arg_types = arg_types
450                    .into_iter()
451                    .map(|t| remove_native_type_prefix(NativeType::from(t)))
452                    .collect::<Vec<_>>();
453                (arg_types, return_type)
454            })
455            .collect::<BTreeSet<_>>())
456    }
457}
458
459fn get_udwf_args_and_return_types(
460    udwf: &Arc<WindowUDF>,
461) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
462    let signature = udwf.signature();
463    let arg_types = signature.type_signature.get_example_types();
464    if arg_types.is_empty() {
465        Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
466    } else {
467        Ok(arg_types
468            .into_iter()
469            .map(|arg_types| {
470                // only handle the function which implemented [`ScalarUDFImpl::return_type`] method
471                let arg_types = arg_types
472                    .into_iter()
473                    .map(|t| remove_native_type_prefix(NativeType::from(t)))
474                    .collect::<Vec<_>>();
475                (arg_types, None)
476            })
477            .collect::<BTreeSet<_>>())
478    }
479}
480
481#[inline]
482fn remove_native_type_prefix(native_type: NativeType) -> String {
483    format!("{native_type}")
484}
485
486#[async_trait]
487impl SchemaProvider for InformationSchemaProvider {
488    fn as_any(&self) -> &dyn Any {
489        self
490    }
491
492    fn table_names(&self) -> Vec<String> {
493        INFORMATION_SCHEMA_TABLES
494            .iter()
495            .map(|t| (*t).to_string())
496            .collect()
497    }
498
499    async fn table(
500        &self,
501        name: &str,
502    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
503        let config = self.config.clone();
504        let table: Arc<dyn PartitionStream> = match name.to_ascii_lowercase().as_str() {
505            TABLES => Arc::new(InformationSchemaTables::new(config)),
506            COLUMNS => Arc::new(InformationSchemaColumns::new(config)),
507            VIEWS => Arc::new(InformationSchemaViews::new(config)),
508            DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
509            SCHEMATA => Arc::new(InformationSchemata::new(config)),
510            ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
511            PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
512            _ => return Ok(None),
513        };
514
515        Ok(Some(Arc::new(
516            StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
517        )))
518    }
519
520    fn table_exist(&self, name: &str) -> bool {
521        INFORMATION_SCHEMA_TABLES.contains(&name.to_ascii_lowercase().as_str())
522    }
523}
524
525#[derive(Debug)]
526struct InformationSchemaTables {
527    schema: SchemaRef,
528    config: InformationSchemaConfig,
529}
530
531impl InformationSchemaTables {
532    fn new(config: InformationSchemaConfig) -> Self {
533        let schema = Arc::new(Schema::new(vec![
534            Field::new("table_catalog", DataType::Utf8, false),
535            Field::new("table_schema", DataType::Utf8, false),
536            Field::new("table_name", DataType::Utf8, false),
537            Field::new("table_type", DataType::Utf8, false),
538        ]));
539
540        Self { schema, config }
541    }
542
543    fn builder(&self) -> InformationSchemaTablesBuilder {
544        InformationSchemaTablesBuilder {
545            catalog_names: StringBuilder::new(),
546            schema_names: StringBuilder::new(),
547            table_names: StringBuilder::new(),
548            table_types: StringBuilder::new(),
549            schema: Arc::clone(&self.schema),
550        }
551    }
552}
553
554impl PartitionStream for InformationSchemaTables {
555    fn schema(&self) -> &SchemaRef {
556        &self.schema
557    }
558
559    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
560        let mut builder = self.builder();
561        let config = self.config.clone();
562        Box::pin(RecordBatchStreamAdapter::new(
563            Arc::clone(&self.schema),
564            // TODO: Stream this
565            futures::stream::once(async move {
566                config.make_tables(&mut builder).await?;
567                Ok(builder.finish())
568            }),
569        ))
570    }
571}
572
573/// Builds the `information_schema.TABLE` table row by row
574///
575/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
576struct InformationSchemaTablesBuilder {
577    schema: SchemaRef,
578    catalog_names: StringBuilder,
579    schema_names: StringBuilder,
580    table_names: StringBuilder,
581    table_types: StringBuilder,
582}
583
584impl InformationSchemaTablesBuilder {
585    fn add_table(
586        &mut self,
587        catalog_name: impl AsRef<str>,
588        schema_name: impl AsRef<str>,
589        table_name: impl AsRef<str>,
590        table_type: TableType,
591    ) {
592        // Note: append_value is actually infallible.
593        self.catalog_names.append_value(catalog_name.as_ref());
594        self.schema_names.append_value(schema_name.as_ref());
595        self.table_names.append_value(table_name.as_ref());
596        self.table_types.append_value(match table_type {
597            TableType::Base => "BASE TABLE",
598            TableType::View => "VIEW",
599            TableType::Temporary => "LOCAL TEMPORARY",
600        });
601    }
602
603    fn finish(&mut self) -> RecordBatch {
604        RecordBatch::try_new(
605            Arc::clone(&self.schema),
606            vec![
607                Arc::new(self.catalog_names.finish()),
608                Arc::new(self.schema_names.finish()),
609                Arc::new(self.table_names.finish()),
610                Arc::new(self.table_types.finish()),
611            ],
612        )
613        .unwrap()
614    }
615}
616
617#[derive(Debug)]
618struct InformationSchemaViews {
619    schema: SchemaRef,
620    config: InformationSchemaConfig,
621}
622
623impl InformationSchemaViews {
624    fn new(config: InformationSchemaConfig) -> Self {
625        let schema = Arc::new(Schema::new(vec![
626            Field::new("table_catalog", DataType::Utf8, false),
627            Field::new("table_schema", DataType::Utf8, false),
628            Field::new("table_name", DataType::Utf8, false),
629            Field::new("definition", DataType::Utf8, true),
630        ]));
631
632        Self { schema, config }
633    }
634
635    fn builder(&self) -> InformationSchemaViewBuilder {
636        InformationSchemaViewBuilder {
637            catalog_names: StringBuilder::new(),
638            schema_names: StringBuilder::new(),
639            table_names: StringBuilder::new(),
640            definitions: StringBuilder::new(),
641            schema: Arc::clone(&self.schema),
642        }
643    }
644}
645
646impl PartitionStream for InformationSchemaViews {
647    fn schema(&self) -> &SchemaRef {
648        &self.schema
649    }
650
651    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
652        let mut builder = self.builder();
653        let config = self.config.clone();
654        Box::pin(RecordBatchStreamAdapter::new(
655            Arc::clone(&self.schema),
656            // TODO: Stream this
657            futures::stream::once(async move {
658                config.make_views(&mut builder).await?;
659                Ok(builder.finish())
660            }),
661        ))
662    }
663}
664
665/// Builds the `information_schema.VIEWS` table row by row
666///
667/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
668struct InformationSchemaViewBuilder {
669    schema: SchemaRef,
670    catalog_names: StringBuilder,
671    schema_names: StringBuilder,
672    table_names: StringBuilder,
673    definitions: StringBuilder,
674}
675
676impl InformationSchemaViewBuilder {
677    fn add_view(
678        &mut self,
679        catalog_name: impl AsRef<str>,
680        schema_name: impl AsRef<str>,
681        table_name: impl AsRef<str>,
682        definition: Option<impl AsRef<str>>,
683    ) {
684        // Note: append_value is actually infallible.
685        self.catalog_names.append_value(catalog_name.as_ref());
686        self.schema_names.append_value(schema_name.as_ref());
687        self.table_names.append_value(table_name.as_ref());
688        self.definitions.append_option(definition.as_ref());
689    }
690
691    fn finish(&mut self) -> RecordBatch {
692        RecordBatch::try_new(
693            Arc::clone(&self.schema),
694            vec![
695                Arc::new(self.catalog_names.finish()),
696                Arc::new(self.schema_names.finish()),
697                Arc::new(self.table_names.finish()),
698                Arc::new(self.definitions.finish()),
699            ],
700        )
701        .unwrap()
702    }
703}
704
705#[derive(Debug)]
706struct InformationSchemaColumns {
707    schema: SchemaRef,
708    config: InformationSchemaConfig,
709}
710
711impl InformationSchemaColumns {
712    fn new(config: InformationSchemaConfig) -> Self {
713        let schema = Arc::new(Schema::new(vec![
714            Field::new("table_catalog", DataType::Utf8, false),
715            Field::new("table_schema", DataType::Utf8, false),
716            Field::new("table_name", DataType::Utf8, false),
717            Field::new("column_name", DataType::Utf8, false),
718            Field::new("ordinal_position", DataType::UInt64, false),
719            Field::new("column_default", DataType::Utf8, true),
720            Field::new("is_nullable", DataType::Utf8, false),
721            Field::new("data_type", DataType::Utf8, false),
722            Field::new("character_maximum_length", DataType::UInt64, true),
723            Field::new("character_octet_length", DataType::UInt64, true),
724            Field::new("numeric_precision", DataType::UInt64, true),
725            Field::new("numeric_precision_radix", DataType::UInt64, true),
726            Field::new("numeric_scale", DataType::UInt64, true),
727            Field::new("datetime_precision", DataType::UInt64, true),
728            Field::new("interval_type", DataType::Utf8, true),
729        ]));
730
731        Self { schema, config }
732    }
733
734    fn builder(&self) -> InformationSchemaColumnsBuilder {
735        // StringBuilder requires providing an initial capacity, so
736        // pick 10 here arbitrarily as this is not performance
737        // critical code and the number of tables is unavailable here.
738        let default_capacity = 10;
739
740        InformationSchemaColumnsBuilder {
741            catalog_names: StringBuilder::new(),
742            schema_names: StringBuilder::new(),
743            table_names: StringBuilder::new(),
744            column_names: StringBuilder::new(),
745            ordinal_positions: UInt64Builder::with_capacity(default_capacity),
746            column_defaults: StringBuilder::new(),
747            is_nullables: StringBuilder::new(),
748            data_types: StringBuilder::new(),
749            character_maximum_lengths: UInt64Builder::with_capacity(default_capacity),
750            character_octet_lengths: UInt64Builder::with_capacity(default_capacity),
751            numeric_precisions: UInt64Builder::with_capacity(default_capacity),
752            numeric_precision_radixes: UInt64Builder::with_capacity(default_capacity),
753            numeric_scales: UInt64Builder::with_capacity(default_capacity),
754            datetime_precisions: UInt64Builder::with_capacity(default_capacity),
755            interval_types: StringBuilder::new(),
756            schema: Arc::clone(&self.schema),
757        }
758    }
759}
760
761impl PartitionStream for InformationSchemaColumns {
762    fn schema(&self) -> &SchemaRef {
763        &self.schema
764    }
765
766    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
767        let mut builder = self.builder();
768        let config = self.config.clone();
769        Box::pin(RecordBatchStreamAdapter::new(
770            Arc::clone(&self.schema),
771            // TODO: Stream this
772            futures::stream::once(async move {
773                config.make_columns(&mut builder).await?;
774                Ok(builder.finish())
775            }),
776        ))
777    }
778}
779
780/// Builds the `information_schema.COLUMNS` table row by row
781///
782/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
783struct InformationSchemaColumnsBuilder {
784    schema: SchemaRef,
785    catalog_names: StringBuilder,
786    schema_names: StringBuilder,
787    table_names: StringBuilder,
788    column_names: StringBuilder,
789    ordinal_positions: UInt64Builder,
790    column_defaults: StringBuilder,
791    is_nullables: StringBuilder,
792    data_types: StringBuilder,
793    character_maximum_lengths: UInt64Builder,
794    character_octet_lengths: UInt64Builder,
795    numeric_precisions: UInt64Builder,
796    numeric_precision_radixes: UInt64Builder,
797    numeric_scales: UInt64Builder,
798    datetime_precisions: UInt64Builder,
799    interval_types: StringBuilder,
800}
801
802impl InformationSchemaColumnsBuilder {
803    fn add_column(
804        &mut self,
805        catalog_name: &str,
806        schema_name: &str,
807        table_name: &str,
808        field_position: usize,
809        field: &Field,
810    ) {
811        use DataType::*;
812
813        // Note: append_value is actually infallible.
814        self.catalog_names.append_value(catalog_name);
815        self.schema_names.append_value(schema_name);
816        self.table_names.append_value(table_name);
817
818        self.column_names.append_value(field.name());
819
820        self.ordinal_positions.append_value(field_position as u64);
821
822        // DataFusion does not support column default values, so null
823        self.column_defaults.append_null();
824
825        // "YES if the column is possibly nullable, NO if it is known not nullable. "
826        let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
827        self.is_nullables.append_value(nullable_str);
828
829        // "System supplied type" --> Use debug format of the datatype
830        self.data_types.append_value(field.data_type().to_string());
831
832        // "If data_type identifies a character or bit string type, the
833        // declared maximum length; null for all other data types or
834        // if no maximum length was declared."
835        //
836        // Arrow has no equivalent of VARCHAR(20), so we leave this as Null
837        let max_chars = None;
838        self.character_maximum_lengths.append_option(max_chars);
839
840        // "Maximum length, in bytes, for binary data, character data,
841        // or text and image data."
842        let char_len: Option<u64> = match field.data_type() {
843            Utf8 | Binary => Some(i32::MAX as u64),
844            LargeBinary | LargeUtf8 => Some(i64::MAX as u64),
845            _ => None,
846        };
847        self.character_octet_lengths.append_option(char_len);
848
849        // numeric_precision: "If data_type identifies a numeric type, this column
850        // contains the (declared or implicit) precision of the type
851        // for this column. The precision indicates the number of
852        // significant digits. It can be expressed in decimal (base
853        // 10) or binary (base 2) terms, as specified in the column
854        // numeric_precision_radix. For all other data types, this
855        // column is null."
856        //
857        // numeric_radix: If data_type identifies a numeric type, this
858        // column indicates in which base the values in the columns
859        // numeric_precision and numeric_scale are expressed. The
860        // value is either 2 or 10. For all other data types, this
861        // column is null.
862        //
863        // numeric_scale: If data_type identifies an exact numeric
864        // type, this column contains the (declared or implicit) scale
865        // of the type for this column. The scale indicates the number
866        // of significant digits to the right of the decimal point. It
867        // can be expressed in decimal (base 10) or binary (base 2)
868        // terms, as specified in the column
869        // numeric_precision_radix. For all other data types, this
870        // column is null.
871        let (numeric_precision, numeric_radix, numeric_scale) = match field.data_type() {
872            Int8 | UInt8 => (Some(8), Some(2), None),
873            Int16 | UInt16 => (Some(16), Some(2), None),
874            Int32 | UInt32 => (Some(32), Some(2), None),
875            // From max value of 65504 as explained on
876            // https://en.wikipedia.org/wiki/Half-precision_floating-point_format#Exponent_encoding
877            Float16 => (Some(15), Some(2), None),
878            // Numbers from postgres `real` type
879            Float32 => (Some(24), Some(2), None),
880            // Numbers from postgres `double` type
881            Float64 => (Some(24), Some(2), None),
882            Decimal128(precision, scale) => {
883                (Some(*precision as u64), Some(10), Some(*scale as u64))
884            }
885            _ => (None, None, None),
886        };
887
888        self.numeric_precisions.append_option(numeric_precision);
889        self.numeric_precision_radixes.append_option(numeric_radix);
890        self.numeric_scales.append_option(numeric_scale);
891
892        self.datetime_precisions.append_option(None);
893        self.interval_types.append_null();
894    }
895
896    fn finish(&mut self) -> RecordBatch {
897        RecordBatch::try_new(
898            Arc::clone(&self.schema),
899            vec![
900                Arc::new(self.catalog_names.finish()),
901                Arc::new(self.schema_names.finish()),
902                Arc::new(self.table_names.finish()),
903                Arc::new(self.column_names.finish()),
904                Arc::new(self.ordinal_positions.finish()),
905                Arc::new(self.column_defaults.finish()),
906                Arc::new(self.is_nullables.finish()),
907                Arc::new(self.data_types.finish()),
908                Arc::new(self.character_maximum_lengths.finish()),
909                Arc::new(self.character_octet_lengths.finish()),
910                Arc::new(self.numeric_precisions.finish()),
911                Arc::new(self.numeric_precision_radixes.finish()),
912                Arc::new(self.numeric_scales.finish()),
913                Arc::new(self.datetime_precisions.finish()),
914                Arc::new(self.interval_types.finish()),
915            ],
916        )
917        .unwrap()
918    }
919}
920
921#[derive(Debug)]
922struct InformationSchemata {
923    schema: SchemaRef,
924    config: InformationSchemaConfig,
925}
926
927impl InformationSchemata {
928    fn new(config: InformationSchemaConfig) -> Self {
929        let schema = Arc::new(Schema::new(vec![
930            Field::new("catalog_name", DataType::Utf8, false),
931            Field::new("schema_name", DataType::Utf8, false),
932            Field::new("schema_owner", DataType::Utf8, true),
933            Field::new("default_character_set_catalog", DataType::Utf8, true),
934            Field::new("default_character_set_schema", DataType::Utf8, true),
935            Field::new("default_character_set_name", DataType::Utf8, true),
936            Field::new("sql_path", DataType::Utf8, true),
937        ]));
938        Self { schema, config }
939    }
940
941    fn builder(&self) -> InformationSchemataBuilder {
942        InformationSchemataBuilder {
943            schema: Arc::clone(&self.schema),
944            catalog_name: StringBuilder::new(),
945            schema_name: StringBuilder::new(),
946            schema_owner: StringBuilder::new(),
947            default_character_set_catalog: StringBuilder::new(),
948            default_character_set_schema: StringBuilder::new(),
949            default_character_set_name: StringBuilder::new(),
950            sql_path: StringBuilder::new(),
951        }
952    }
953}
954
955struct InformationSchemataBuilder {
956    schema: SchemaRef,
957    catalog_name: StringBuilder,
958    schema_name: StringBuilder,
959    schema_owner: StringBuilder,
960    default_character_set_catalog: StringBuilder,
961    default_character_set_schema: StringBuilder,
962    default_character_set_name: StringBuilder,
963    sql_path: StringBuilder,
964}
965
966impl InformationSchemataBuilder {
967    fn add_schemata(
968        &mut self,
969        catalog_name: &str,
970        schema_name: &str,
971        schema_owner: Option<&str>,
972    ) {
973        self.catalog_name.append_value(catalog_name);
974        self.schema_name.append_value(schema_name);
975        match schema_owner {
976            Some(owner) => self.schema_owner.append_value(owner),
977            None => self.schema_owner.append_null(),
978        }
979        // refer to https://www.postgresql.org/docs/current/infoschema-schemata.html,
980        // these rows apply to a feature that is not implemented in DataFusion
981        self.default_character_set_catalog.append_null();
982        self.default_character_set_schema.append_null();
983        self.default_character_set_name.append_null();
984        self.sql_path.append_null();
985    }
986
987    fn finish(&mut self) -> RecordBatch {
988        RecordBatch::try_new(
989            Arc::clone(&self.schema),
990            vec![
991                Arc::new(self.catalog_name.finish()),
992                Arc::new(self.schema_name.finish()),
993                Arc::new(self.schema_owner.finish()),
994                Arc::new(self.default_character_set_catalog.finish()),
995                Arc::new(self.default_character_set_schema.finish()),
996                Arc::new(self.default_character_set_name.finish()),
997                Arc::new(self.sql_path.finish()),
998            ],
999        )
1000        .unwrap()
1001    }
1002}
1003
1004impl PartitionStream for InformationSchemata {
1005    fn schema(&self) -> &SchemaRef {
1006        &self.schema
1007    }
1008
1009    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1010        let mut builder = self.builder();
1011        let config = self.config.clone();
1012        Box::pin(RecordBatchStreamAdapter::new(
1013            Arc::clone(&self.schema),
1014            // TODO: Stream this
1015            futures::stream::once(async move {
1016                config.make_schemata(&mut builder).await;
1017                Ok(builder.finish())
1018            }),
1019        ))
1020    }
1021}
1022
1023#[derive(Debug)]
1024struct InformationSchemaDfSettings {
1025    schema: SchemaRef,
1026    config: InformationSchemaConfig,
1027}
1028
1029impl InformationSchemaDfSettings {
1030    fn new(config: InformationSchemaConfig) -> Self {
1031        let schema = Arc::new(Schema::new(vec![
1032            Field::new("name", DataType::Utf8, false),
1033            Field::new("value", DataType::Utf8, true),
1034            Field::new("description", DataType::Utf8, true),
1035        ]));
1036
1037        Self { schema, config }
1038    }
1039
1040    fn builder(&self) -> InformationSchemaDfSettingsBuilder {
1041        InformationSchemaDfSettingsBuilder {
1042            names: StringBuilder::new(),
1043            values: StringBuilder::new(),
1044            descriptions: StringBuilder::new(),
1045            schema: Arc::clone(&self.schema),
1046        }
1047    }
1048}
1049
1050impl PartitionStream for InformationSchemaDfSettings {
1051    fn schema(&self) -> &SchemaRef {
1052        &self.schema
1053    }
1054
1055    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1056        let config = self.config.clone();
1057        let mut builder = self.builder();
1058        Box::pin(RecordBatchStreamAdapter::new(
1059            Arc::clone(&self.schema),
1060            // TODO: Stream this
1061            futures::stream::once(async move {
1062                // create a mem table with the names of tables
1063                config.make_df_settings(ctx.session_config().options(), &mut builder);
1064                Ok(builder.finish())
1065            }),
1066        ))
1067    }
1068}
1069
1070struct InformationSchemaDfSettingsBuilder {
1071    schema: SchemaRef,
1072    names: StringBuilder,
1073    values: StringBuilder,
1074    descriptions: StringBuilder,
1075}
1076
1077impl InformationSchemaDfSettingsBuilder {
1078    fn add_setting(&mut self, entry: ConfigEntry) {
1079        self.names.append_value(entry.key);
1080        self.values.append_option(entry.value);
1081        self.descriptions.append_value(entry.description);
1082    }
1083
1084    fn finish(&mut self) -> RecordBatch {
1085        RecordBatch::try_new(
1086            Arc::clone(&self.schema),
1087            vec![
1088                Arc::new(self.names.finish()),
1089                Arc::new(self.values.finish()),
1090                Arc::new(self.descriptions.finish()),
1091            ],
1092        )
1093        .unwrap()
1094    }
1095}
1096
1097#[derive(Debug)]
1098struct InformationSchemaRoutines {
1099    schema: SchemaRef,
1100    config: InformationSchemaConfig,
1101}
1102
1103impl InformationSchemaRoutines {
1104    fn new(config: InformationSchemaConfig) -> Self {
1105        let schema = Arc::new(Schema::new(vec![
1106            Field::new("specific_catalog", DataType::Utf8, false),
1107            Field::new("specific_schema", DataType::Utf8, false),
1108            Field::new("specific_name", DataType::Utf8, false),
1109            Field::new("routine_catalog", DataType::Utf8, false),
1110            Field::new("routine_schema", DataType::Utf8, false),
1111            Field::new("routine_name", DataType::Utf8, false),
1112            Field::new("routine_type", DataType::Utf8, false),
1113            Field::new("is_deterministic", DataType::Boolean, true),
1114            Field::new("data_type", DataType::Utf8, true),
1115            Field::new("function_type", DataType::Utf8, true),
1116            Field::new("description", DataType::Utf8, true),
1117            Field::new("syntax_example", DataType::Utf8, true),
1118        ]));
1119
1120        Self { schema, config }
1121    }
1122
1123    fn builder(&self) -> InformationSchemaRoutinesBuilder {
1124        InformationSchemaRoutinesBuilder {
1125            schema: Arc::clone(&self.schema),
1126            specific_catalog: StringBuilder::new(),
1127            specific_schema: StringBuilder::new(),
1128            specific_name: StringBuilder::new(),
1129            routine_catalog: StringBuilder::new(),
1130            routine_schema: StringBuilder::new(),
1131            routine_name: StringBuilder::new(),
1132            routine_type: StringBuilder::new(),
1133            is_deterministic: BooleanBuilder::new(),
1134            data_type: StringBuilder::new(),
1135            function_type: StringBuilder::new(),
1136            description: StringBuilder::new(),
1137            syntax_example: StringBuilder::new(),
1138        }
1139    }
1140}
1141
1142struct InformationSchemaRoutinesBuilder {
1143    schema: SchemaRef,
1144    specific_catalog: StringBuilder,
1145    specific_schema: StringBuilder,
1146    specific_name: StringBuilder,
1147    routine_catalog: StringBuilder,
1148    routine_schema: StringBuilder,
1149    routine_name: StringBuilder,
1150    routine_type: StringBuilder,
1151    is_deterministic: BooleanBuilder,
1152    data_type: StringBuilder,
1153    function_type: StringBuilder,
1154    description: StringBuilder,
1155    syntax_example: StringBuilder,
1156}
1157
1158impl InformationSchemaRoutinesBuilder {
1159    #[allow(clippy::too_many_arguments)]
1160    fn add_routine(
1161        &mut self,
1162        catalog_name: impl AsRef<str>,
1163        schema_name: impl AsRef<str>,
1164        routine_name: impl AsRef<str>,
1165        routine_type: impl AsRef<str>,
1166        is_deterministic: bool,
1167        data_type: Option<impl AsRef<str>>,
1168        function_type: impl AsRef<str>,
1169        description: Option<impl AsRef<str>>,
1170        syntax_example: Option<impl AsRef<str>>,
1171    ) {
1172        self.specific_catalog.append_value(catalog_name.as_ref());
1173        self.specific_schema.append_value(schema_name.as_ref());
1174        self.specific_name.append_value(routine_name.as_ref());
1175        self.routine_catalog.append_value(catalog_name.as_ref());
1176        self.routine_schema.append_value(schema_name.as_ref());
1177        self.routine_name.append_value(routine_name.as_ref());
1178        self.routine_type.append_value(routine_type.as_ref());
1179        self.is_deterministic.append_value(is_deterministic);
1180        self.data_type.append_option(data_type.as_ref());
1181        self.function_type.append_value(function_type.as_ref());
1182        self.description.append_option(description);
1183        self.syntax_example.append_option(syntax_example);
1184    }
1185
1186    fn finish(&mut self) -> RecordBatch {
1187        RecordBatch::try_new(
1188            Arc::clone(&self.schema),
1189            vec![
1190                Arc::new(self.specific_catalog.finish()),
1191                Arc::new(self.specific_schema.finish()),
1192                Arc::new(self.specific_name.finish()),
1193                Arc::new(self.routine_catalog.finish()),
1194                Arc::new(self.routine_schema.finish()),
1195                Arc::new(self.routine_name.finish()),
1196                Arc::new(self.routine_type.finish()),
1197                Arc::new(self.is_deterministic.finish()),
1198                Arc::new(self.data_type.finish()),
1199                Arc::new(self.function_type.finish()),
1200                Arc::new(self.description.finish()),
1201                Arc::new(self.syntax_example.finish()),
1202            ],
1203        )
1204        .unwrap()
1205    }
1206}
1207
1208impl PartitionStream for InformationSchemaRoutines {
1209    fn schema(&self) -> &SchemaRef {
1210        &self.schema
1211    }
1212
1213    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1214        let config = self.config.clone();
1215        let mut builder = self.builder();
1216        Box::pin(RecordBatchStreamAdapter::new(
1217            Arc::clone(&self.schema),
1218            futures::stream::once(async move {
1219                config.make_routines(
1220                    ctx.scalar_functions(),
1221                    ctx.aggregate_functions(),
1222                    ctx.window_functions(),
1223                    ctx.session_config().options(),
1224                    &mut builder,
1225                )?;
1226                Ok(builder.finish())
1227            }),
1228        ))
1229    }
1230}
1231
1232#[derive(Debug)]
1233struct InformationSchemaParameters {
1234    schema: SchemaRef,
1235    config: InformationSchemaConfig,
1236}
1237
1238impl InformationSchemaParameters {
1239    fn new(config: InformationSchemaConfig) -> Self {
1240        let schema = Arc::new(Schema::new(vec![
1241            Field::new("specific_catalog", DataType::Utf8, false),
1242            Field::new("specific_schema", DataType::Utf8, false),
1243            Field::new("specific_name", DataType::Utf8, false),
1244            Field::new("ordinal_position", DataType::UInt64, false),
1245            Field::new("parameter_mode", DataType::Utf8, false),
1246            Field::new("parameter_name", DataType::Utf8, true),
1247            Field::new("data_type", DataType::Utf8, false),
1248            Field::new("parameter_default", DataType::Utf8, true),
1249            Field::new("is_variadic", DataType::Boolean, false),
1250            // `rid` (short for `routine id`) is used to differentiate parameters from different signatures
1251            // (It serves as the group-by key when generating the `SHOW FUNCTIONS` query).
1252            // For example, the following signatures have different `rid` values:
1253            //     - `datetrunc(Utf8, Timestamp(Microsecond, Some("+TZ"))) -> Timestamp(Microsecond, Some("+TZ"))`
1254            //     - `datetrunc(Utf8View, Timestamp(Nanosecond, None)) -> Timestamp(Nanosecond, None)`
1255            Field::new("rid", DataType::UInt8, false),
1256        ]));
1257
1258        Self { schema, config }
1259    }
1260
1261    fn builder(&self) -> InformationSchemaParametersBuilder {
1262        InformationSchemaParametersBuilder {
1263            schema: Arc::clone(&self.schema),
1264            specific_catalog: StringBuilder::new(),
1265            specific_schema: StringBuilder::new(),
1266            specific_name: StringBuilder::new(),
1267            ordinal_position: UInt64Builder::new(),
1268            parameter_mode: StringBuilder::new(),
1269            parameter_name: StringBuilder::new(),
1270            data_type: StringBuilder::new(),
1271            parameter_default: StringBuilder::new(),
1272            is_variadic: BooleanBuilder::new(),
1273            rid: UInt8Builder::new(),
1274        }
1275    }
1276}
1277
1278struct InformationSchemaParametersBuilder {
1279    schema: SchemaRef,
1280    specific_catalog: StringBuilder,
1281    specific_schema: StringBuilder,
1282    specific_name: StringBuilder,
1283    ordinal_position: UInt64Builder,
1284    parameter_mode: StringBuilder,
1285    parameter_name: StringBuilder,
1286    data_type: StringBuilder,
1287    parameter_default: StringBuilder,
1288    is_variadic: BooleanBuilder,
1289    rid: UInt8Builder,
1290}
1291
1292impl InformationSchemaParametersBuilder {
1293    #[allow(clippy::too_many_arguments)]
1294    fn add_parameter(
1295        &mut self,
1296        specific_catalog: impl AsRef<str>,
1297        specific_schema: impl AsRef<str>,
1298        specific_name: impl AsRef<str>,
1299        ordinal_position: u64,
1300        parameter_mode: impl AsRef<str>,
1301        parameter_name: Option<impl AsRef<str>>,
1302        data_type: impl AsRef<str>,
1303        parameter_default: Option<impl AsRef<str>>,
1304        is_variadic: bool,
1305        rid: u8,
1306    ) {
1307        self.specific_catalog
1308            .append_value(specific_catalog.as_ref());
1309        self.specific_schema.append_value(specific_schema.as_ref());
1310        self.specific_name.append_value(specific_name.as_ref());
1311        self.ordinal_position.append_value(ordinal_position);
1312        self.parameter_mode.append_value(parameter_mode.as_ref());
1313        self.parameter_name.append_option(parameter_name.as_ref());
1314        self.data_type.append_value(data_type.as_ref());
1315        self.parameter_default.append_option(parameter_default);
1316        self.is_variadic.append_value(is_variadic);
1317        self.rid.append_value(rid);
1318    }
1319
1320    fn finish(&mut self) -> RecordBatch {
1321        RecordBatch::try_new(
1322            Arc::clone(&self.schema),
1323            vec![
1324                Arc::new(self.specific_catalog.finish()),
1325                Arc::new(self.specific_schema.finish()),
1326                Arc::new(self.specific_name.finish()),
1327                Arc::new(self.ordinal_position.finish()),
1328                Arc::new(self.parameter_mode.finish()),
1329                Arc::new(self.parameter_name.finish()),
1330                Arc::new(self.data_type.finish()),
1331                Arc::new(self.parameter_default.finish()),
1332                Arc::new(self.is_variadic.finish()),
1333                Arc::new(self.rid.finish()),
1334            ],
1335        )
1336        .unwrap()
1337    }
1338}
1339
1340impl PartitionStream for InformationSchemaParameters {
1341    fn schema(&self) -> &SchemaRef {
1342        &self.schema
1343    }
1344
1345    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1346        let config = self.config.clone();
1347        let mut builder = self.builder();
1348        Box::pin(RecordBatchStreamAdapter::new(
1349            Arc::clone(&self.schema),
1350            futures::stream::once(async move {
1351                config.make_parameters(
1352                    ctx.scalar_functions(),
1353                    ctx.aggregate_functions(),
1354                    ctx.window_functions(),
1355                    ctx.session_config().options(),
1356                    &mut builder,
1357                )?;
1358                Ok(builder.finish())
1359            }),
1360        ))
1361    }
1362}
1363
1364#[cfg(test)]
1365mod tests {
1366    use super::*;
1367    use crate::CatalogProvider;
1368
1369    #[tokio::test]
1370    async fn make_tables_uses_table_type() {
1371        let config = InformationSchemaConfig {
1372            catalog_list: Arc::new(Fixture),
1373        };
1374        let mut builder = InformationSchemaTablesBuilder {
1375            catalog_names: StringBuilder::new(),
1376            schema_names: StringBuilder::new(),
1377            table_names: StringBuilder::new(),
1378            table_types: StringBuilder::new(),
1379            schema: Arc::new(Schema::empty()),
1380        };
1381
1382        assert!(config.make_tables(&mut builder).await.is_ok());
1383
1384        assert_eq!("BASE TABLE", builder.table_types.finish().value(0));
1385    }
1386
1387    #[derive(Debug)]
1388    struct Fixture;
1389
1390    #[async_trait]
1391    impl SchemaProvider for Fixture {
1392        // InformationSchemaConfig::make_tables should use this.
1393        async fn table_type(&self, _: &str) -> Result<Option<TableType>> {
1394            Ok(Some(TableType::Base))
1395        }
1396
1397        // InformationSchemaConfig::make_tables used this before `table_type`
1398        // existed but should not, as it may be expensive.
1399        async fn table(&self, _: &str) -> Result<Option<Arc<dyn TableProvider>>> {
1400            panic!("InformationSchemaConfig::make_tables called SchemaProvider::table instead of table_type")
1401        }
1402
1403        fn as_any(&self) -> &dyn Any {
1404            unimplemented!("not required for these tests")
1405        }
1406
1407        fn table_names(&self) -> Vec<String> {
1408            vec!["atable".to_string()]
1409        }
1410
1411        fn table_exist(&self, _: &str) -> bool {
1412            unimplemented!("not required for these tests")
1413        }
1414    }
1415
1416    impl CatalogProviderList for Fixture {
1417        fn as_any(&self) -> &dyn Any {
1418            unimplemented!("not required for these tests")
1419        }
1420
1421        fn register_catalog(
1422            &self,
1423            _: String,
1424            _: Arc<dyn CatalogProvider>,
1425        ) -> Option<Arc<dyn CatalogProvider>> {
1426            unimplemented!("not required for these tests")
1427        }
1428
1429        fn catalog_names(&self) -> Vec<String> {
1430            vec!["acatalog".to_string()]
1431        }
1432
1433        fn catalog(&self, _: &str) -> Option<Arc<dyn CatalogProvider>> {
1434            Some(Arc::new(Self))
1435        }
1436    }
1437
1438    impl CatalogProvider for Fixture {
1439        fn as_any(&self) -> &dyn Any {
1440            unimplemented!("not required for these tests")
1441        }
1442
1443        fn schema_names(&self) -> Vec<String> {
1444            vec!["aschema".to_string()]
1445        }
1446
1447        fn schema(&self, _: &str) -> Option<Arc<dyn SchemaProvider>> {
1448            Some(Arc::new(Self))
1449        }
1450    }
1451}