1use crate::streaming::StreamingTable;
23use crate::{CatalogProviderList, SchemaProvider, TableProvider};
24use arrow::array::builder::{BooleanBuilder, UInt8Builder};
25use arrow::{
26 array::{StringBuilder, UInt64Builder},
27 datatypes::{DataType, Field, Schema, SchemaRef},
28 record_batch::RecordBatch,
29};
30use async_trait::async_trait;
31use datafusion_common::config::{ConfigEntry, ConfigOptions};
32use datafusion_common::error::Result;
33use datafusion_common::types::NativeType;
34use datafusion_common::DataFusionError;
35use datafusion_execution::TaskContext;
36use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
37use datafusion_expr::{TableType, Volatility};
38use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
39use datafusion_physical_plan::streaming::PartitionStream;
40use datafusion_physical_plan::SendableRecordBatchStream;
41use std::collections::{BTreeSet, HashMap, HashSet};
42use std::fmt::Debug;
43use std::{any::Any, sync::Arc};
44
45pub const INFORMATION_SCHEMA: &str = "information_schema";
46pub(crate) const TABLES: &str = "tables";
47pub(crate) const VIEWS: &str = "views";
48pub(crate) const COLUMNS: &str = "columns";
49pub(crate) const DF_SETTINGS: &str = "df_settings";
50pub(crate) const SCHEMATA: &str = "schemata";
51pub(crate) const ROUTINES: &str = "routines";
52pub(crate) const PARAMETERS: &str = "parameters";
53
54pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[
56 TABLES,
57 VIEWS,
58 COLUMNS,
59 DF_SETTINGS,
60 SCHEMATA,
61 ROUTINES,
62 PARAMETERS,
63];
64
65#[derive(Debug)]
72pub struct InformationSchemaProvider {
73 config: InformationSchemaConfig,
74}
75
76impl InformationSchemaProvider {
77 pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
79 Self {
80 config: InformationSchemaConfig { catalog_list },
81 }
82 }
83}
84
85#[derive(Clone, Debug)]
86struct InformationSchemaConfig {
87 catalog_list: Arc<dyn CatalogProviderList>,
88}
89
90impl InformationSchemaConfig {
91 async fn make_tables(
93 &self,
94 builder: &mut InformationSchemaTablesBuilder,
95 ) -> Result<(), DataFusionError> {
96 for catalog_name in self.catalog_list.catalog_names() {
99 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
100
101 for schema_name in catalog.schema_names() {
102 if schema_name != INFORMATION_SCHEMA {
103 if let Some(schema) = catalog.schema(&schema_name) {
105 for table_name in schema.table_names() {
106 if let Some(table_type) =
107 schema.table_type(&table_name).await?
108 {
109 builder.add_table(
110 &catalog_name,
111 &schema_name,
112 &table_name,
113 table_type,
114 );
115 }
116 }
117 }
118 }
119 }
120
121 for table_name in INFORMATION_SCHEMA_TABLES {
123 builder.add_table(
124 &catalog_name,
125 INFORMATION_SCHEMA,
126 table_name,
127 TableType::View,
128 );
129 }
130 }
131
132 Ok(())
133 }
134
135 async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
136 for catalog_name in self.catalog_list.catalog_names() {
137 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
138
139 for schema_name in catalog.schema_names() {
140 if schema_name != INFORMATION_SCHEMA {
141 if let Some(schema) = catalog.schema(&schema_name) {
142 let schema_owner = schema.owner_name();
143 builder.add_schemata(&catalog_name, &schema_name, schema_owner);
144 }
145 }
146 }
147 }
148 }
149
150 async fn make_views(
151 &self,
152 builder: &mut InformationSchemaViewBuilder,
153 ) -> Result<(), DataFusionError> {
154 for catalog_name in self.catalog_list.catalog_names() {
155 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
156
157 for schema_name in catalog.schema_names() {
158 if schema_name != INFORMATION_SCHEMA {
159 if let Some(schema) = catalog.schema(&schema_name) {
161 for table_name in schema.table_names() {
162 if let Some(table) = schema.table(&table_name).await? {
163 builder.add_view(
164 &catalog_name,
165 &schema_name,
166 &table_name,
167 table.get_table_definition(),
168 )
169 }
170 }
171 }
172 }
173 }
174 }
175
176 Ok(())
177 }
178
179 async fn make_columns(
181 &self,
182 builder: &mut InformationSchemaColumnsBuilder,
183 ) -> Result<(), DataFusionError> {
184 for catalog_name in self.catalog_list.catalog_names() {
185 let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
186
187 for schema_name in catalog.schema_names() {
188 if schema_name != INFORMATION_SCHEMA {
189 if let Some(schema) = catalog.schema(&schema_name) {
191 for table_name in schema.table_names() {
192 if let Some(table) = schema.table(&table_name).await? {
193 for (field_position, field) in
194 table.schema().fields().iter().enumerate()
195 {
196 builder.add_column(
197 &catalog_name,
198 &schema_name,
199 &table_name,
200 field_position,
201 field,
202 )
203 }
204 }
205 }
206 }
207 }
208 }
209 }
210
211 Ok(())
212 }
213
214 fn make_df_settings(
216 &self,
217 config_options: &ConfigOptions,
218 builder: &mut InformationSchemaDfSettingsBuilder,
219 ) {
220 for entry in config_options.entries() {
221 builder.add_setting(entry);
222 }
223 }
224
225 fn make_routines(
226 &self,
227 udfs: &HashMap<String, Arc<ScalarUDF>>,
228 udafs: &HashMap<String, Arc<AggregateUDF>>,
229 udwfs: &HashMap<String, Arc<WindowUDF>>,
230 config_options: &ConfigOptions,
231 builder: &mut InformationSchemaRoutinesBuilder,
232 ) -> Result<()> {
233 let catalog_name = &config_options.catalog.default_catalog;
234 let schema_name = &config_options.catalog.default_schema;
235
236 for (name, udf) in udfs {
237 let return_types = get_udf_args_and_return_types(udf)?
238 .into_iter()
239 .map(|(_, return_type)| return_type)
240 .collect::<HashSet<_>>();
241 for return_type in return_types {
242 builder.add_routine(
243 catalog_name,
244 schema_name,
245 name,
246 "FUNCTION",
247 Self::is_deterministic(udf.signature()),
248 return_type,
249 "SCALAR",
250 udf.documentation().map(|d| d.description.to_string()),
251 udf.documentation().map(|d| d.syntax_example.to_string()),
252 )
253 }
254 }
255
256 for (name, udaf) in udafs {
257 let return_types = get_udaf_args_and_return_types(udaf)?
258 .into_iter()
259 .map(|(_, return_type)| return_type)
260 .collect::<HashSet<_>>();
261 for return_type in return_types {
262 builder.add_routine(
263 catalog_name,
264 schema_name,
265 name,
266 "FUNCTION",
267 Self::is_deterministic(udaf.signature()),
268 return_type,
269 "AGGREGATE",
270 udaf.documentation().map(|d| d.description.to_string()),
271 udaf.documentation().map(|d| d.syntax_example.to_string()),
272 )
273 }
274 }
275
276 for (name, udwf) in udwfs {
277 let return_types = get_udwf_args_and_return_types(udwf)?
278 .into_iter()
279 .map(|(_, return_type)| return_type)
280 .collect::<HashSet<_>>();
281 for return_type in return_types {
282 builder.add_routine(
283 catalog_name,
284 schema_name,
285 name,
286 "FUNCTION",
287 Self::is_deterministic(udwf.signature()),
288 return_type,
289 "WINDOW",
290 udwf.documentation().map(|d| d.description.to_string()),
291 udwf.documentation().map(|d| d.syntax_example.to_string()),
292 )
293 }
294 }
295 Ok(())
296 }
297
298 fn is_deterministic(signature: &Signature) -> bool {
299 signature.volatility == Volatility::Immutable
300 }
301 fn make_parameters(
302 &self,
303 udfs: &HashMap<String, Arc<ScalarUDF>>,
304 udafs: &HashMap<String, Arc<AggregateUDF>>,
305 udwfs: &HashMap<String, Arc<WindowUDF>>,
306 config_options: &ConfigOptions,
307 builder: &mut InformationSchemaParametersBuilder,
308 ) -> Result<()> {
309 let catalog_name = &config_options.catalog.default_catalog;
310 let schema_name = &config_options.catalog.default_schema;
311 let mut add_parameters = |func_name: &str,
312 args: Option<&Vec<(String, String)>>,
313 arg_types: Vec<String>,
314 return_type: Option<String>,
315 is_variadic: bool,
316 rid: u8| {
317 for (position, type_name) in arg_types.iter().enumerate() {
318 let param_name =
319 args.and_then(|a| a.get(position).map(|arg| arg.0.as_str()));
320 builder.add_parameter(
321 catalog_name,
322 schema_name,
323 func_name,
324 position as u64 + 1,
325 "IN",
326 param_name,
327 type_name,
328 None::<&str>,
329 is_variadic,
330 rid,
331 );
332 }
333 if let Some(return_type) = return_type {
334 builder.add_parameter(
335 catalog_name,
336 schema_name,
337 func_name,
338 1,
339 "OUT",
340 None::<&str>,
341 return_type.as_str(),
342 None::<&str>,
343 false,
344 rid,
345 );
346 }
347 };
348
349 for (func_name, udf) in udfs {
350 let args = udf.documentation().and_then(|d| d.arguments.clone());
351 let combinations = get_udf_args_and_return_types(udf)?;
352 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
353 add_parameters(
354 func_name,
355 args.as_ref(),
356 arg_types,
357 return_type,
358 Self::is_variadic(udf.signature()),
359 rid as u8,
360 );
361 }
362 }
363
364 for (func_name, udaf) in udafs {
365 let args = udaf.documentation().and_then(|d| d.arguments.clone());
366 let combinations = get_udaf_args_and_return_types(udaf)?;
367 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
368 add_parameters(
369 func_name,
370 args.as_ref(),
371 arg_types,
372 return_type,
373 Self::is_variadic(udaf.signature()),
374 rid as u8,
375 );
376 }
377 }
378
379 for (func_name, udwf) in udwfs {
380 let args = udwf.documentation().and_then(|d| d.arguments.clone());
381 let combinations = get_udwf_args_and_return_types(udwf)?;
382 for (rid, (arg_types, return_type)) in combinations.into_iter().enumerate() {
383 add_parameters(
384 func_name,
385 args.as_ref(),
386 arg_types,
387 return_type,
388 Self::is_variadic(udwf.signature()),
389 rid as u8,
390 );
391 }
392 }
393
394 Ok(())
395 }
396
397 fn is_variadic(signature: &Signature) -> bool {
398 matches!(
399 signature.type_signature,
400 TypeSignature::Variadic(_) | TypeSignature::VariadicAny
401 )
402 }
403}
404
405fn get_udf_args_and_return_types(
408 udf: &Arc<ScalarUDF>,
409) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
410 let signature = udf.signature();
411 let arg_types = signature.type_signature.get_example_types();
412 if arg_types.is_empty() {
413 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
414 } else {
415 Ok(arg_types
416 .into_iter()
417 .map(|arg_types| {
418 let return_type = udf
420 .return_type(&arg_types)
421 .map(|t| remove_native_type_prefix(NativeType::from(t)))
422 .ok();
423 let arg_types = arg_types
424 .into_iter()
425 .map(|t| remove_native_type_prefix(NativeType::from(t)))
426 .collect::<Vec<_>>();
427 (arg_types, return_type)
428 })
429 .collect::<BTreeSet<_>>())
430 }
431}
432
433fn get_udaf_args_and_return_types(
434 udaf: &Arc<AggregateUDF>,
435) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
436 let signature = udaf.signature();
437 let arg_types = signature.type_signature.get_example_types();
438 if arg_types.is_empty() {
439 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
440 } else {
441 Ok(arg_types
442 .into_iter()
443 .map(|arg_types| {
444 let return_type = udaf
446 .return_type(&arg_types)
447 .ok()
448 .map(|t| remove_native_type_prefix(NativeType::from(t)));
449 let arg_types = arg_types
450 .into_iter()
451 .map(|t| remove_native_type_prefix(NativeType::from(t)))
452 .collect::<Vec<_>>();
453 (arg_types, return_type)
454 })
455 .collect::<BTreeSet<_>>())
456 }
457}
458
459fn get_udwf_args_and_return_types(
460 udwf: &Arc<WindowUDF>,
461) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
462 let signature = udwf.signature();
463 let arg_types = signature.type_signature.get_example_types();
464 if arg_types.is_empty() {
465 Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
466 } else {
467 Ok(arg_types
468 .into_iter()
469 .map(|arg_types| {
470 let arg_types = arg_types
472 .into_iter()
473 .map(|t| remove_native_type_prefix(NativeType::from(t)))
474 .collect::<Vec<_>>();
475 (arg_types, None)
476 })
477 .collect::<BTreeSet<_>>())
478 }
479}
480
481#[inline]
482fn remove_native_type_prefix(native_type: NativeType) -> String {
483 format!("{native_type}")
484}
485
486#[async_trait]
487impl SchemaProvider for InformationSchemaProvider {
488 fn as_any(&self) -> &dyn Any {
489 self
490 }
491
492 fn table_names(&self) -> Vec<String> {
493 INFORMATION_SCHEMA_TABLES
494 .iter()
495 .map(|t| (*t).to_string())
496 .collect()
497 }
498
499 async fn table(
500 &self,
501 name: &str,
502 ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
503 let config = self.config.clone();
504 let table: Arc<dyn PartitionStream> = match name.to_ascii_lowercase().as_str() {
505 TABLES => Arc::new(InformationSchemaTables::new(config)),
506 COLUMNS => Arc::new(InformationSchemaColumns::new(config)),
507 VIEWS => Arc::new(InformationSchemaViews::new(config)),
508 DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
509 SCHEMATA => Arc::new(InformationSchemata::new(config)),
510 ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
511 PARAMETERS => Arc::new(InformationSchemaParameters::new(config)),
512 _ => return Ok(None),
513 };
514
515 Ok(Some(Arc::new(
516 StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
517 )))
518 }
519
520 fn table_exist(&self, name: &str) -> bool {
521 INFORMATION_SCHEMA_TABLES.contains(&name.to_ascii_lowercase().as_str())
522 }
523}
524
525#[derive(Debug)]
526struct InformationSchemaTables {
527 schema: SchemaRef,
528 config: InformationSchemaConfig,
529}
530
531impl InformationSchemaTables {
532 fn new(config: InformationSchemaConfig) -> Self {
533 let schema = Arc::new(Schema::new(vec![
534 Field::new("table_catalog", DataType::Utf8, false),
535 Field::new("table_schema", DataType::Utf8, false),
536 Field::new("table_name", DataType::Utf8, false),
537 Field::new("table_type", DataType::Utf8, false),
538 ]));
539
540 Self { schema, config }
541 }
542
543 fn builder(&self) -> InformationSchemaTablesBuilder {
544 InformationSchemaTablesBuilder {
545 catalog_names: StringBuilder::new(),
546 schema_names: StringBuilder::new(),
547 table_names: StringBuilder::new(),
548 table_types: StringBuilder::new(),
549 schema: Arc::clone(&self.schema),
550 }
551 }
552}
553
554impl PartitionStream for InformationSchemaTables {
555 fn schema(&self) -> &SchemaRef {
556 &self.schema
557 }
558
559 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
560 let mut builder = self.builder();
561 let config = self.config.clone();
562 Box::pin(RecordBatchStreamAdapter::new(
563 Arc::clone(&self.schema),
564 futures::stream::once(async move {
566 config.make_tables(&mut builder).await?;
567 Ok(builder.finish())
568 }),
569 ))
570 }
571}
572
573struct InformationSchemaTablesBuilder {
577 schema: SchemaRef,
578 catalog_names: StringBuilder,
579 schema_names: StringBuilder,
580 table_names: StringBuilder,
581 table_types: StringBuilder,
582}
583
584impl InformationSchemaTablesBuilder {
585 fn add_table(
586 &mut self,
587 catalog_name: impl AsRef<str>,
588 schema_name: impl AsRef<str>,
589 table_name: impl AsRef<str>,
590 table_type: TableType,
591 ) {
592 self.catalog_names.append_value(catalog_name.as_ref());
594 self.schema_names.append_value(schema_name.as_ref());
595 self.table_names.append_value(table_name.as_ref());
596 self.table_types.append_value(match table_type {
597 TableType::Base => "BASE TABLE",
598 TableType::View => "VIEW",
599 TableType::Temporary => "LOCAL TEMPORARY",
600 });
601 }
602
603 fn finish(&mut self) -> RecordBatch {
604 RecordBatch::try_new(
605 Arc::clone(&self.schema),
606 vec![
607 Arc::new(self.catalog_names.finish()),
608 Arc::new(self.schema_names.finish()),
609 Arc::new(self.table_names.finish()),
610 Arc::new(self.table_types.finish()),
611 ],
612 )
613 .unwrap()
614 }
615}
616
617#[derive(Debug)]
618struct InformationSchemaViews {
619 schema: SchemaRef,
620 config: InformationSchemaConfig,
621}
622
623impl InformationSchemaViews {
624 fn new(config: InformationSchemaConfig) -> Self {
625 let schema = Arc::new(Schema::new(vec![
626 Field::new("table_catalog", DataType::Utf8, false),
627 Field::new("table_schema", DataType::Utf8, false),
628 Field::new("table_name", DataType::Utf8, false),
629 Field::new("definition", DataType::Utf8, true),
630 ]));
631
632 Self { schema, config }
633 }
634
635 fn builder(&self) -> InformationSchemaViewBuilder {
636 InformationSchemaViewBuilder {
637 catalog_names: StringBuilder::new(),
638 schema_names: StringBuilder::new(),
639 table_names: StringBuilder::new(),
640 definitions: StringBuilder::new(),
641 schema: Arc::clone(&self.schema),
642 }
643 }
644}
645
646impl PartitionStream for InformationSchemaViews {
647 fn schema(&self) -> &SchemaRef {
648 &self.schema
649 }
650
651 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
652 let mut builder = self.builder();
653 let config = self.config.clone();
654 Box::pin(RecordBatchStreamAdapter::new(
655 Arc::clone(&self.schema),
656 futures::stream::once(async move {
658 config.make_views(&mut builder).await?;
659 Ok(builder.finish())
660 }),
661 ))
662 }
663}
664
665struct InformationSchemaViewBuilder {
669 schema: SchemaRef,
670 catalog_names: StringBuilder,
671 schema_names: StringBuilder,
672 table_names: StringBuilder,
673 definitions: StringBuilder,
674}
675
676impl InformationSchemaViewBuilder {
677 fn add_view(
678 &mut self,
679 catalog_name: impl AsRef<str>,
680 schema_name: impl AsRef<str>,
681 table_name: impl AsRef<str>,
682 definition: Option<impl AsRef<str>>,
683 ) {
684 self.catalog_names.append_value(catalog_name.as_ref());
686 self.schema_names.append_value(schema_name.as_ref());
687 self.table_names.append_value(table_name.as_ref());
688 self.definitions.append_option(definition.as_ref());
689 }
690
691 fn finish(&mut self) -> RecordBatch {
692 RecordBatch::try_new(
693 Arc::clone(&self.schema),
694 vec![
695 Arc::new(self.catalog_names.finish()),
696 Arc::new(self.schema_names.finish()),
697 Arc::new(self.table_names.finish()),
698 Arc::new(self.definitions.finish()),
699 ],
700 )
701 .unwrap()
702 }
703}
704
705#[derive(Debug)]
706struct InformationSchemaColumns {
707 schema: SchemaRef,
708 config: InformationSchemaConfig,
709}
710
711impl InformationSchemaColumns {
712 fn new(config: InformationSchemaConfig) -> Self {
713 let schema = Arc::new(Schema::new(vec![
714 Field::new("table_catalog", DataType::Utf8, false),
715 Field::new("table_schema", DataType::Utf8, false),
716 Field::new("table_name", DataType::Utf8, false),
717 Field::new("column_name", DataType::Utf8, false),
718 Field::new("ordinal_position", DataType::UInt64, false),
719 Field::new("column_default", DataType::Utf8, true),
720 Field::new("is_nullable", DataType::Utf8, false),
721 Field::new("data_type", DataType::Utf8, false),
722 Field::new("character_maximum_length", DataType::UInt64, true),
723 Field::new("character_octet_length", DataType::UInt64, true),
724 Field::new("numeric_precision", DataType::UInt64, true),
725 Field::new("numeric_precision_radix", DataType::UInt64, true),
726 Field::new("numeric_scale", DataType::UInt64, true),
727 Field::new("datetime_precision", DataType::UInt64, true),
728 Field::new("interval_type", DataType::Utf8, true),
729 ]));
730
731 Self { schema, config }
732 }
733
734 fn builder(&self) -> InformationSchemaColumnsBuilder {
735 let default_capacity = 10;
739
740 InformationSchemaColumnsBuilder {
741 catalog_names: StringBuilder::new(),
742 schema_names: StringBuilder::new(),
743 table_names: StringBuilder::new(),
744 column_names: StringBuilder::new(),
745 ordinal_positions: UInt64Builder::with_capacity(default_capacity),
746 column_defaults: StringBuilder::new(),
747 is_nullables: StringBuilder::new(),
748 data_types: StringBuilder::new(),
749 character_maximum_lengths: UInt64Builder::with_capacity(default_capacity),
750 character_octet_lengths: UInt64Builder::with_capacity(default_capacity),
751 numeric_precisions: UInt64Builder::with_capacity(default_capacity),
752 numeric_precision_radixes: UInt64Builder::with_capacity(default_capacity),
753 numeric_scales: UInt64Builder::with_capacity(default_capacity),
754 datetime_precisions: UInt64Builder::with_capacity(default_capacity),
755 interval_types: StringBuilder::new(),
756 schema: Arc::clone(&self.schema),
757 }
758 }
759}
760
761impl PartitionStream for InformationSchemaColumns {
762 fn schema(&self) -> &SchemaRef {
763 &self.schema
764 }
765
766 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
767 let mut builder = self.builder();
768 let config = self.config.clone();
769 Box::pin(RecordBatchStreamAdapter::new(
770 Arc::clone(&self.schema),
771 futures::stream::once(async move {
773 config.make_columns(&mut builder).await?;
774 Ok(builder.finish())
775 }),
776 ))
777 }
778}
779
780struct InformationSchemaColumnsBuilder {
784 schema: SchemaRef,
785 catalog_names: StringBuilder,
786 schema_names: StringBuilder,
787 table_names: StringBuilder,
788 column_names: StringBuilder,
789 ordinal_positions: UInt64Builder,
790 column_defaults: StringBuilder,
791 is_nullables: StringBuilder,
792 data_types: StringBuilder,
793 character_maximum_lengths: UInt64Builder,
794 character_octet_lengths: UInt64Builder,
795 numeric_precisions: UInt64Builder,
796 numeric_precision_radixes: UInt64Builder,
797 numeric_scales: UInt64Builder,
798 datetime_precisions: UInt64Builder,
799 interval_types: StringBuilder,
800}
801
802impl InformationSchemaColumnsBuilder {
803 fn add_column(
804 &mut self,
805 catalog_name: &str,
806 schema_name: &str,
807 table_name: &str,
808 field_position: usize,
809 field: &Field,
810 ) {
811 use DataType::*;
812
813 self.catalog_names.append_value(catalog_name);
815 self.schema_names.append_value(schema_name);
816 self.table_names.append_value(table_name);
817
818 self.column_names.append_value(field.name());
819
820 self.ordinal_positions.append_value(field_position as u64);
821
822 self.column_defaults.append_null();
824
825 let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
827 self.is_nullables.append_value(nullable_str);
828
829 self.data_types.append_value(field.data_type().to_string());
831
832 let max_chars = None;
838 self.character_maximum_lengths.append_option(max_chars);
839
840 let char_len: Option<u64> = match field.data_type() {
843 Utf8 | Binary => Some(i32::MAX as u64),
844 LargeBinary | LargeUtf8 => Some(i64::MAX as u64),
845 _ => None,
846 };
847 self.character_octet_lengths.append_option(char_len);
848
849 let (numeric_precision, numeric_radix, numeric_scale) = match field.data_type() {
872 Int8 | UInt8 => (Some(8), Some(2), None),
873 Int16 | UInt16 => (Some(16), Some(2), None),
874 Int32 | UInt32 => (Some(32), Some(2), None),
875 Float16 => (Some(15), Some(2), None),
878 Float32 => (Some(24), Some(2), None),
880 Float64 => (Some(24), Some(2), None),
882 Decimal128(precision, scale) => {
883 (Some(*precision as u64), Some(10), Some(*scale as u64))
884 }
885 _ => (None, None, None),
886 };
887
888 self.numeric_precisions.append_option(numeric_precision);
889 self.numeric_precision_radixes.append_option(numeric_radix);
890 self.numeric_scales.append_option(numeric_scale);
891
892 self.datetime_precisions.append_option(None);
893 self.interval_types.append_null();
894 }
895
896 fn finish(&mut self) -> RecordBatch {
897 RecordBatch::try_new(
898 Arc::clone(&self.schema),
899 vec![
900 Arc::new(self.catalog_names.finish()),
901 Arc::new(self.schema_names.finish()),
902 Arc::new(self.table_names.finish()),
903 Arc::new(self.column_names.finish()),
904 Arc::new(self.ordinal_positions.finish()),
905 Arc::new(self.column_defaults.finish()),
906 Arc::new(self.is_nullables.finish()),
907 Arc::new(self.data_types.finish()),
908 Arc::new(self.character_maximum_lengths.finish()),
909 Arc::new(self.character_octet_lengths.finish()),
910 Arc::new(self.numeric_precisions.finish()),
911 Arc::new(self.numeric_precision_radixes.finish()),
912 Arc::new(self.numeric_scales.finish()),
913 Arc::new(self.datetime_precisions.finish()),
914 Arc::new(self.interval_types.finish()),
915 ],
916 )
917 .unwrap()
918 }
919}
920
921#[derive(Debug)]
922struct InformationSchemata {
923 schema: SchemaRef,
924 config: InformationSchemaConfig,
925}
926
927impl InformationSchemata {
928 fn new(config: InformationSchemaConfig) -> Self {
929 let schema = Arc::new(Schema::new(vec![
930 Field::new("catalog_name", DataType::Utf8, false),
931 Field::new("schema_name", DataType::Utf8, false),
932 Field::new("schema_owner", DataType::Utf8, true),
933 Field::new("default_character_set_catalog", DataType::Utf8, true),
934 Field::new("default_character_set_schema", DataType::Utf8, true),
935 Field::new("default_character_set_name", DataType::Utf8, true),
936 Field::new("sql_path", DataType::Utf8, true),
937 ]));
938 Self { schema, config }
939 }
940
941 fn builder(&self) -> InformationSchemataBuilder {
942 InformationSchemataBuilder {
943 schema: Arc::clone(&self.schema),
944 catalog_name: StringBuilder::new(),
945 schema_name: StringBuilder::new(),
946 schema_owner: StringBuilder::new(),
947 default_character_set_catalog: StringBuilder::new(),
948 default_character_set_schema: StringBuilder::new(),
949 default_character_set_name: StringBuilder::new(),
950 sql_path: StringBuilder::new(),
951 }
952 }
953}
954
955struct InformationSchemataBuilder {
956 schema: SchemaRef,
957 catalog_name: StringBuilder,
958 schema_name: StringBuilder,
959 schema_owner: StringBuilder,
960 default_character_set_catalog: StringBuilder,
961 default_character_set_schema: StringBuilder,
962 default_character_set_name: StringBuilder,
963 sql_path: StringBuilder,
964}
965
966impl InformationSchemataBuilder {
967 fn add_schemata(
968 &mut self,
969 catalog_name: &str,
970 schema_name: &str,
971 schema_owner: Option<&str>,
972 ) {
973 self.catalog_name.append_value(catalog_name);
974 self.schema_name.append_value(schema_name);
975 match schema_owner {
976 Some(owner) => self.schema_owner.append_value(owner),
977 None => self.schema_owner.append_null(),
978 }
979 self.default_character_set_catalog.append_null();
982 self.default_character_set_schema.append_null();
983 self.default_character_set_name.append_null();
984 self.sql_path.append_null();
985 }
986
987 fn finish(&mut self) -> RecordBatch {
988 RecordBatch::try_new(
989 Arc::clone(&self.schema),
990 vec![
991 Arc::new(self.catalog_name.finish()),
992 Arc::new(self.schema_name.finish()),
993 Arc::new(self.schema_owner.finish()),
994 Arc::new(self.default_character_set_catalog.finish()),
995 Arc::new(self.default_character_set_schema.finish()),
996 Arc::new(self.default_character_set_name.finish()),
997 Arc::new(self.sql_path.finish()),
998 ],
999 )
1000 .unwrap()
1001 }
1002}
1003
1004impl PartitionStream for InformationSchemata {
1005 fn schema(&self) -> &SchemaRef {
1006 &self.schema
1007 }
1008
1009 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1010 let mut builder = self.builder();
1011 let config = self.config.clone();
1012 Box::pin(RecordBatchStreamAdapter::new(
1013 Arc::clone(&self.schema),
1014 futures::stream::once(async move {
1016 config.make_schemata(&mut builder).await;
1017 Ok(builder.finish())
1018 }),
1019 ))
1020 }
1021}
1022
1023#[derive(Debug)]
1024struct InformationSchemaDfSettings {
1025 schema: SchemaRef,
1026 config: InformationSchemaConfig,
1027}
1028
1029impl InformationSchemaDfSettings {
1030 fn new(config: InformationSchemaConfig) -> Self {
1031 let schema = Arc::new(Schema::new(vec![
1032 Field::new("name", DataType::Utf8, false),
1033 Field::new("value", DataType::Utf8, true),
1034 Field::new("description", DataType::Utf8, true),
1035 ]));
1036
1037 Self { schema, config }
1038 }
1039
1040 fn builder(&self) -> InformationSchemaDfSettingsBuilder {
1041 InformationSchemaDfSettingsBuilder {
1042 names: StringBuilder::new(),
1043 values: StringBuilder::new(),
1044 descriptions: StringBuilder::new(),
1045 schema: Arc::clone(&self.schema),
1046 }
1047 }
1048}
1049
1050impl PartitionStream for InformationSchemaDfSettings {
1051 fn schema(&self) -> &SchemaRef {
1052 &self.schema
1053 }
1054
1055 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1056 let config = self.config.clone();
1057 let mut builder = self.builder();
1058 Box::pin(RecordBatchStreamAdapter::new(
1059 Arc::clone(&self.schema),
1060 futures::stream::once(async move {
1062 config.make_df_settings(ctx.session_config().options(), &mut builder);
1064 Ok(builder.finish())
1065 }),
1066 ))
1067 }
1068}
1069
1070struct InformationSchemaDfSettingsBuilder {
1071 schema: SchemaRef,
1072 names: StringBuilder,
1073 values: StringBuilder,
1074 descriptions: StringBuilder,
1075}
1076
1077impl InformationSchemaDfSettingsBuilder {
1078 fn add_setting(&mut self, entry: ConfigEntry) {
1079 self.names.append_value(entry.key);
1080 self.values.append_option(entry.value);
1081 self.descriptions.append_value(entry.description);
1082 }
1083
1084 fn finish(&mut self) -> RecordBatch {
1085 RecordBatch::try_new(
1086 Arc::clone(&self.schema),
1087 vec![
1088 Arc::new(self.names.finish()),
1089 Arc::new(self.values.finish()),
1090 Arc::new(self.descriptions.finish()),
1091 ],
1092 )
1093 .unwrap()
1094 }
1095}
1096
1097#[derive(Debug)]
1098struct InformationSchemaRoutines {
1099 schema: SchemaRef,
1100 config: InformationSchemaConfig,
1101}
1102
1103impl InformationSchemaRoutines {
1104 fn new(config: InformationSchemaConfig) -> Self {
1105 let schema = Arc::new(Schema::new(vec![
1106 Field::new("specific_catalog", DataType::Utf8, false),
1107 Field::new("specific_schema", DataType::Utf8, false),
1108 Field::new("specific_name", DataType::Utf8, false),
1109 Field::new("routine_catalog", DataType::Utf8, false),
1110 Field::new("routine_schema", DataType::Utf8, false),
1111 Field::new("routine_name", DataType::Utf8, false),
1112 Field::new("routine_type", DataType::Utf8, false),
1113 Field::new("is_deterministic", DataType::Boolean, true),
1114 Field::new("data_type", DataType::Utf8, true),
1115 Field::new("function_type", DataType::Utf8, true),
1116 Field::new("description", DataType::Utf8, true),
1117 Field::new("syntax_example", DataType::Utf8, true),
1118 ]));
1119
1120 Self { schema, config }
1121 }
1122
1123 fn builder(&self) -> InformationSchemaRoutinesBuilder {
1124 InformationSchemaRoutinesBuilder {
1125 schema: Arc::clone(&self.schema),
1126 specific_catalog: StringBuilder::new(),
1127 specific_schema: StringBuilder::new(),
1128 specific_name: StringBuilder::new(),
1129 routine_catalog: StringBuilder::new(),
1130 routine_schema: StringBuilder::new(),
1131 routine_name: StringBuilder::new(),
1132 routine_type: StringBuilder::new(),
1133 is_deterministic: BooleanBuilder::new(),
1134 data_type: StringBuilder::new(),
1135 function_type: StringBuilder::new(),
1136 description: StringBuilder::new(),
1137 syntax_example: StringBuilder::new(),
1138 }
1139 }
1140}
1141
1142struct InformationSchemaRoutinesBuilder {
1143 schema: SchemaRef,
1144 specific_catalog: StringBuilder,
1145 specific_schema: StringBuilder,
1146 specific_name: StringBuilder,
1147 routine_catalog: StringBuilder,
1148 routine_schema: StringBuilder,
1149 routine_name: StringBuilder,
1150 routine_type: StringBuilder,
1151 is_deterministic: BooleanBuilder,
1152 data_type: StringBuilder,
1153 function_type: StringBuilder,
1154 description: StringBuilder,
1155 syntax_example: StringBuilder,
1156}
1157
1158impl InformationSchemaRoutinesBuilder {
1159 #[allow(clippy::too_many_arguments)]
1160 fn add_routine(
1161 &mut self,
1162 catalog_name: impl AsRef<str>,
1163 schema_name: impl AsRef<str>,
1164 routine_name: impl AsRef<str>,
1165 routine_type: impl AsRef<str>,
1166 is_deterministic: bool,
1167 data_type: Option<impl AsRef<str>>,
1168 function_type: impl AsRef<str>,
1169 description: Option<impl AsRef<str>>,
1170 syntax_example: Option<impl AsRef<str>>,
1171 ) {
1172 self.specific_catalog.append_value(catalog_name.as_ref());
1173 self.specific_schema.append_value(schema_name.as_ref());
1174 self.specific_name.append_value(routine_name.as_ref());
1175 self.routine_catalog.append_value(catalog_name.as_ref());
1176 self.routine_schema.append_value(schema_name.as_ref());
1177 self.routine_name.append_value(routine_name.as_ref());
1178 self.routine_type.append_value(routine_type.as_ref());
1179 self.is_deterministic.append_value(is_deterministic);
1180 self.data_type.append_option(data_type.as_ref());
1181 self.function_type.append_value(function_type.as_ref());
1182 self.description.append_option(description);
1183 self.syntax_example.append_option(syntax_example);
1184 }
1185
1186 fn finish(&mut self) -> RecordBatch {
1187 RecordBatch::try_new(
1188 Arc::clone(&self.schema),
1189 vec![
1190 Arc::new(self.specific_catalog.finish()),
1191 Arc::new(self.specific_schema.finish()),
1192 Arc::new(self.specific_name.finish()),
1193 Arc::new(self.routine_catalog.finish()),
1194 Arc::new(self.routine_schema.finish()),
1195 Arc::new(self.routine_name.finish()),
1196 Arc::new(self.routine_type.finish()),
1197 Arc::new(self.is_deterministic.finish()),
1198 Arc::new(self.data_type.finish()),
1199 Arc::new(self.function_type.finish()),
1200 Arc::new(self.description.finish()),
1201 Arc::new(self.syntax_example.finish()),
1202 ],
1203 )
1204 .unwrap()
1205 }
1206}
1207
1208impl PartitionStream for InformationSchemaRoutines {
1209 fn schema(&self) -> &SchemaRef {
1210 &self.schema
1211 }
1212
1213 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1214 let config = self.config.clone();
1215 let mut builder = self.builder();
1216 Box::pin(RecordBatchStreamAdapter::new(
1217 Arc::clone(&self.schema),
1218 futures::stream::once(async move {
1219 config.make_routines(
1220 ctx.scalar_functions(),
1221 ctx.aggregate_functions(),
1222 ctx.window_functions(),
1223 ctx.session_config().options(),
1224 &mut builder,
1225 )?;
1226 Ok(builder.finish())
1227 }),
1228 ))
1229 }
1230}
1231
1232#[derive(Debug)]
1233struct InformationSchemaParameters {
1234 schema: SchemaRef,
1235 config: InformationSchemaConfig,
1236}
1237
1238impl InformationSchemaParameters {
1239 fn new(config: InformationSchemaConfig) -> Self {
1240 let schema = Arc::new(Schema::new(vec![
1241 Field::new("specific_catalog", DataType::Utf8, false),
1242 Field::new("specific_schema", DataType::Utf8, false),
1243 Field::new("specific_name", DataType::Utf8, false),
1244 Field::new("ordinal_position", DataType::UInt64, false),
1245 Field::new("parameter_mode", DataType::Utf8, false),
1246 Field::new("parameter_name", DataType::Utf8, true),
1247 Field::new("data_type", DataType::Utf8, false),
1248 Field::new("parameter_default", DataType::Utf8, true),
1249 Field::new("is_variadic", DataType::Boolean, false),
1250 Field::new("rid", DataType::UInt8, false),
1256 ]));
1257
1258 Self { schema, config }
1259 }
1260
1261 fn builder(&self) -> InformationSchemaParametersBuilder {
1262 InformationSchemaParametersBuilder {
1263 schema: Arc::clone(&self.schema),
1264 specific_catalog: StringBuilder::new(),
1265 specific_schema: StringBuilder::new(),
1266 specific_name: StringBuilder::new(),
1267 ordinal_position: UInt64Builder::new(),
1268 parameter_mode: StringBuilder::new(),
1269 parameter_name: StringBuilder::new(),
1270 data_type: StringBuilder::new(),
1271 parameter_default: StringBuilder::new(),
1272 is_variadic: BooleanBuilder::new(),
1273 rid: UInt8Builder::new(),
1274 }
1275 }
1276}
1277
1278struct InformationSchemaParametersBuilder {
1279 schema: SchemaRef,
1280 specific_catalog: StringBuilder,
1281 specific_schema: StringBuilder,
1282 specific_name: StringBuilder,
1283 ordinal_position: UInt64Builder,
1284 parameter_mode: StringBuilder,
1285 parameter_name: StringBuilder,
1286 data_type: StringBuilder,
1287 parameter_default: StringBuilder,
1288 is_variadic: BooleanBuilder,
1289 rid: UInt8Builder,
1290}
1291
1292impl InformationSchemaParametersBuilder {
1293 #[allow(clippy::too_many_arguments)]
1294 fn add_parameter(
1295 &mut self,
1296 specific_catalog: impl AsRef<str>,
1297 specific_schema: impl AsRef<str>,
1298 specific_name: impl AsRef<str>,
1299 ordinal_position: u64,
1300 parameter_mode: impl AsRef<str>,
1301 parameter_name: Option<impl AsRef<str>>,
1302 data_type: impl AsRef<str>,
1303 parameter_default: Option<impl AsRef<str>>,
1304 is_variadic: bool,
1305 rid: u8,
1306 ) {
1307 self.specific_catalog
1308 .append_value(specific_catalog.as_ref());
1309 self.specific_schema.append_value(specific_schema.as_ref());
1310 self.specific_name.append_value(specific_name.as_ref());
1311 self.ordinal_position.append_value(ordinal_position);
1312 self.parameter_mode.append_value(parameter_mode.as_ref());
1313 self.parameter_name.append_option(parameter_name.as_ref());
1314 self.data_type.append_value(data_type.as_ref());
1315 self.parameter_default.append_option(parameter_default);
1316 self.is_variadic.append_value(is_variadic);
1317 self.rid.append_value(rid);
1318 }
1319
1320 fn finish(&mut self) -> RecordBatch {
1321 RecordBatch::try_new(
1322 Arc::clone(&self.schema),
1323 vec![
1324 Arc::new(self.specific_catalog.finish()),
1325 Arc::new(self.specific_schema.finish()),
1326 Arc::new(self.specific_name.finish()),
1327 Arc::new(self.ordinal_position.finish()),
1328 Arc::new(self.parameter_mode.finish()),
1329 Arc::new(self.parameter_name.finish()),
1330 Arc::new(self.data_type.finish()),
1331 Arc::new(self.parameter_default.finish()),
1332 Arc::new(self.is_variadic.finish()),
1333 Arc::new(self.rid.finish()),
1334 ],
1335 )
1336 .unwrap()
1337 }
1338}
1339
1340impl PartitionStream for InformationSchemaParameters {
1341 fn schema(&self) -> &SchemaRef {
1342 &self.schema
1343 }
1344
1345 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
1346 let config = self.config.clone();
1347 let mut builder = self.builder();
1348 Box::pin(RecordBatchStreamAdapter::new(
1349 Arc::clone(&self.schema),
1350 futures::stream::once(async move {
1351 config.make_parameters(
1352 ctx.scalar_functions(),
1353 ctx.aggregate_functions(),
1354 ctx.window_functions(),
1355 ctx.session_config().options(),
1356 &mut builder,
1357 )?;
1358 Ok(builder.finish())
1359 }),
1360 ))
1361 }
1362}
1363
1364#[cfg(test)]
1365mod tests {
1366 use super::*;
1367 use crate::CatalogProvider;
1368
1369 #[tokio::test]
1370 async fn make_tables_uses_table_type() {
1371 let config = InformationSchemaConfig {
1372 catalog_list: Arc::new(Fixture),
1373 };
1374 let mut builder = InformationSchemaTablesBuilder {
1375 catalog_names: StringBuilder::new(),
1376 schema_names: StringBuilder::new(),
1377 table_names: StringBuilder::new(),
1378 table_types: StringBuilder::new(),
1379 schema: Arc::new(Schema::empty()),
1380 };
1381
1382 assert!(config.make_tables(&mut builder).await.is_ok());
1383
1384 assert_eq!("BASE TABLE", builder.table_types.finish().value(0));
1385 }
1386
1387 #[derive(Debug)]
1388 struct Fixture;
1389
1390 #[async_trait]
1391 impl SchemaProvider for Fixture {
1392 async fn table_type(&self, _: &str) -> Result<Option<TableType>> {
1394 Ok(Some(TableType::Base))
1395 }
1396
1397 async fn table(&self, _: &str) -> Result<Option<Arc<dyn TableProvider>>> {
1400 panic!("InformationSchemaConfig::make_tables called SchemaProvider::table instead of table_type")
1401 }
1402
1403 fn as_any(&self) -> &dyn Any {
1404 unimplemented!("not required for these tests")
1405 }
1406
1407 fn table_names(&self) -> Vec<String> {
1408 vec!["atable".to_string()]
1409 }
1410
1411 fn table_exist(&self, _: &str) -> bool {
1412 unimplemented!("not required for these tests")
1413 }
1414 }
1415
1416 impl CatalogProviderList for Fixture {
1417 fn as_any(&self) -> &dyn Any {
1418 unimplemented!("not required for these tests")
1419 }
1420
1421 fn register_catalog(
1422 &self,
1423 _: String,
1424 _: Arc<dyn CatalogProvider>,
1425 ) -> Option<Arc<dyn CatalogProvider>> {
1426 unimplemented!("not required for these tests")
1427 }
1428
1429 fn catalog_names(&self) -> Vec<String> {
1430 vec!["acatalog".to_string()]
1431 }
1432
1433 fn catalog(&self, _: &str) -> Option<Arc<dyn CatalogProvider>> {
1434 Some(Arc::new(Self))
1435 }
1436 }
1437
1438 impl CatalogProvider for Fixture {
1439 fn as_any(&self) -> &dyn Any {
1440 unimplemented!("not required for these tests")
1441 }
1442
1443 fn schema_names(&self) -> Vec<String> {
1444 vec!["aschema".to_string()]
1445 }
1446
1447 fn schema(&self, _: &str) -> Option<Arc<dyn SchemaProvider>> {
1448 Some(Arc::new(Self))
1449 }
1450 }
1451}