1pub mod dynamic_file;
23pub mod empty;
24pub mod file_format;
25pub mod listing;
26pub mod listing_table_factory;
27mod memory_test;
28pub mod physical_plan;
29pub mod provider;
30mod view_test;
31
32pub use self::default_table_source::{
34 provider_as_source, source_as_provider, DefaultTableSource,
35};
36pub use self::memory::MemTable;
37pub use self::view::ViewTable;
38pub use crate::catalog::TableProvider;
39pub use crate::logical_expr::TableType;
40pub use datafusion_catalog::cte_worktable;
41pub use datafusion_catalog::default_table_source;
42pub use datafusion_catalog::memory;
43pub use datafusion_catalog::stream;
44pub use datafusion_catalog::view;
45pub use datafusion_datasource::schema_adapter;
46pub use datafusion_datasource::sink;
47pub use datafusion_datasource::source;
48pub use datafusion_datasource::table_schema;
49pub use datafusion_execution::object_store;
50pub use datafusion_physical_expr::create_ordering;
51
52#[cfg(all(test, feature = "parquet"))]
53mod tests {
54
55 use crate::prelude::SessionContext;
56 use ::object_store::{path::Path, ObjectMeta};
57 use arrow::{
58 array::{Int32Array, StringArray},
59 datatypes::{DataType, Field, Schema, SchemaRef},
60 record_batch::RecordBatch,
61 };
62 use datafusion_common::{record_batch, test_util::batches_to_sort_string};
63 use datafusion_datasource::{
64 file::FileSource,
65 file_scan_config::FileScanConfigBuilder,
66 schema_adapter::{
67 DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
68 SchemaMapper,
69 },
70 source::DataSourceExec,
71 PartitionedFile,
72 };
73 use datafusion_datasource_parquet::source::ParquetSource;
74 use datafusion_physical_plan::collect;
75 use std::{fs, sync::Arc};
76 use tempfile::TempDir;
77
78 #[tokio::test]
79 async fn can_override_schema_adapter() {
80 use datafusion_execution::object_store::ObjectStoreUrl;
85 let tmp_dir = TempDir::new().unwrap();
86 let table_dir = tmp_dir.path().join("parquet_test");
87 fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
88 let f1 = Field::new("id", DataType::Int32, true);
89
90 let file_schema = Arc::new(Schema::new(vec![f1.clone()]));
91 let filename = "part.parquet".to_string();
92 let path = table_dir.as_path().join(filename.clone());
93 let file = fs::File::create(path.clone()).unwrap();
94 let mut writer =
95 parquet::arrow::ArrowWriter::try_new(file, file_schema.clone(), None)
96 .unwrap();
97
98 let ids = Arc::new(Int32Array::from(vec![1i32]));
99 let rec_batch = RecordBatch::try_new(file_schema.clone(), vec![ids]).unwrap();
100
101 writer.write(&rec_batch).unwrap();
102 writer.close().unwrap();
103
104 let location = Path::parse(path.to_str().unwrap()).unwrap();
105 let metadata = fs::metadata(path.as_path()).expect("Local file metadata");
106 let meta = ObjectMeta {
107 location,
108 last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
109 size: metadata.len(),
110 e_tag: None,
111 version: None,
112 };
113
114 let partitioned_file = PartitionedFile {
115 object_meta: meta,
116 partition_values: vec![],
117 range: None,
118 statistics: None,
119 extensions: None,
120 metadata_size_hint: None,
121 };
122
123 let f1 = Field::new("id", DataType::Int32, true);
124 let f2 = Field::new("extra_column", DataType::Utf8, true);
125
126 let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
127 let source = ParquetSource::default()
128 .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}))
129 .unwrap();
130 let base_conf = FileScanConfigBuilder::new(
131 ObjectStoreUrl::local_filesystem(),
132 schema,
133 source,
134 )
135 .with_file(partitioned_file)
136 .build();
137
138 let parquet_exec = DataSourceExec::from_data_source(base_conf);
139
140 let session_ctx = SessionContext::new();
141 let task_ctx = session_ctx.task_ctx();
142 let read = collect(parquet_exec, task_ctx).await.unwrap();
143
144 insta::assert_snapshot!(batches_to_sort_string(&read),@r###"
145 +----+--------------+
146 | id | extra_column |
147 +----+--------------+
148 | 1 | foo |
149 +----+--------------+
150 "###);
151 }
152
153 #[test]
154 fn default_schema_adapter() {
155 let table_schema = Schema::new(vec![
156 Field::new("a", DataType::Int32, true),
157 Field::new("b", DataType::Utf8, true),
158 ]);
159
160 let file_schema = Schema::new(vec![
162 Field::new("c", DataType::Float64, true), Field::new("b", DataType::Float64, true),
164 ]);
165
166 let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
167 let (mapper, indices) = adapter.map_schema(&file_schema).unwrap();
168 assert_eq!(indices, vec![1]);
169
170 let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap();
171
172 let mapped_batch = mapper.map_batch(file_batch).unwrap();
173
174 let expected_batch = record_batch!(
176 ("a", Int32, vec![None, None]), ("b", Utf8, vec!["1.0", "2.0"]) )
179 .unwrap();
180 assert_eq!(mapped_batch, expected_batch);
181 }
182
183 #[test]
184 fn default_schema_adapter_non_nullable_columns() {
185 let table_schema = Schema::new(vec![
186 Field::new("a", DataType::Int32, false), Field::new("b", DataType::Utf8, true),
188 ]);
189 let file_schema = Schema::new(vec![
190 Field::new("b", DataType::Float64, true),
192 ]);
193
194 let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
195 let (mapper, indices) = adapter.map_schema(&file_schema).unwrap();
196 assert_eq!(indices, vec![0]);
197
198 let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap();
199
200 let err = mapper.map_batch(file_batch).unwrap_err().to_string();
202 assert!(err.contains("Invalid argument error: Column 'a' is declared as non-nullable but contains null values"), "{err}");
203 }
204
205 #[derive(Debug)]
206 struct TestSchemaAdapterFactory;
207
208 impl SchemaAdapterFactory for TestSchemaAdapterFactory {
209 fn create(
210 &self,
211 projected_table_schema: SchemaRef,
212 _table_schema: SchemaRef,
213 ) -> Box<dyn SchemaAdapter> {
214 Box::new(TestSchemaAdapter {
215 table_schema: projected_table_schema,
216 })
217 }
218 }
219
220 struct TestSchemaAdapter {
221 table_schema: SchemaRef,
223 }
224
225 impl SchemaAdapter for TestSchemaAdapter {
226 fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
227 let field = self.table_schema.field(index);
228 Some(file_schema.fields.find(field.name())?.0)
229 }
230
231 fn map_schema(
232 &self,
233 file_schema: &Schema,
234 ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
235 let mut projection = Vec::with_capacity(file_schema.fields().len());
236
237 for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
238 if self.table_schema.fields().find(file_field.name()).is_some() {
239 projection.push(file_idx);
240 }
241 }
242
243 Ok((Arc::new(TestSchemaMapping {}), projection))
244 }
245 }
246
247 #[derive(Debug)]
248 struct TestSchemaMapping {}
249
250 impl SchemaMapper for TestSchemaMapping {
251 fn map_batch(
252 &self,
253 batch: RecordBatch,
254 ) -> datafusion_common::Result<RecordBatch> {
255 let f1 = Field::new("id", DataType::Int32, true);
256 let f2 = Field::new("extra_column", DataType::Utf8, true);
257
258 let schema = Arc::new(Schema::new(vec![f1, f2]));
259
260 let extra_column = Arc::new(StringArray::from(vec!["foo"]));
261 let mut new_columns = batch.columns().to_vec();
262 new_columns.push(extra_column);
263
264 Ok(RecordBatch::try_new(schema, new_columns).unwrap())
265 }
266
267 fn map_column_statistics(
268 &self,
269 _file_col_statistics: &[datafusion_common::ColumnStatistics],
270 ) -> datafusion_common::Result<Vec<datafusion_common::ColumnStatistics>> {
271 unimplemented!()
272 }
273 }
274}