datafusion/datasource/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! DataFusion data sources: [`TableProvider`] and [`ListingTable`]
19//!
20//! [`ListingTable`]: crate::datasource::listing::ListingTable
21
22pub mod dynamic_file;
23pub mod empty;
24pub mod file_format;
25pub mod listing;
26pub mod listing_table_factory;
27mod memory_test;
28pub mod physical_plan;
29pub mod provider;
30mod view_test;
31
32// backwards compatibility
33pub use self::default_table_source::{
34    provider_as_source, source_as_provider, DefaultTableSource,
35};
36pub use self::memory::MemTable;
37pub use self::view::ViewTable;
38pub use crate::catalog::TableProvider;
39pub use crate::logical_expr::TableType;
40pub use datafusion_catalog::cte_worktable;
41pub use datafusion_catalog::default_table_source;
42pub use datafusion_catalog::memory;
43pub use datafusion_catalog::stream;
44pub use datafusion_catalog::view;
45pub use datafusion_datasource::schema_adapter;
46pub use datafusion_datasource::sink;
47pub use datafusion_datasource::source;
48pub use datafusion_datasource::table_schema;
49pub use datafusion_execution::object_store;
50pub use datafusion_physical_expr::create_ordering;
51
52#[cfg(all(test, feature = "parquet"))]
53mod tests {
54
55    use crate::prelude::SessionContext;
56    use ::object_store::{path::Path, ObjectMeta};
57    use arrow::{
58        array::{Int32Array, StringArray},
59        datatypes::{DataType, Field, Schema, SchemaRef},
60        record_batch::RecordBatch,
61    };
62    use datafusion_common::{record_batch, test_util::batches_to_sort_string};
63    use datafusion_datasource::{
64        file::FileSource,
65        file_scan_config::FileScanConfigBuilder,
66        schema_adapter::{
67            DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
68            SchemaMapper,
69        },
70        source::DataSourceExec,
71        PartitionedFile,
72    };
73    use datafusion_datasource_parquet::source::ParquetSource;
74    use datafusion_physical_plan::collect;
75    use std::{fs, sync::Arc};
76    use tempfile::TempDir;
77
78    #[tokio::test]
79    async fn can_override_schema_adapter() {
80        // Test shows that SchemaAdapter can add a column that doesn't existing in the
81        // record batches returned from parquet.  This can be useful for schema evolution
82        // where older files may not have all columns.
83
84        use datafusion_execution::object_store::ObjectStoreUrl;
85        let tmp_dir = TempDir::new().unwrap();
86        let table_dir = tmp_dir.path().join("parquet_test");
87        fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
88        let f1 = Field::new("id", DataType::Int32, true);
89
90        let file_schema = Arc::new(Schema::new(vec![f1.clone()]));
91        let filename = "part.parquet".to_string();
92        let path = table_dir.as_path().join(filename.clone());
93        let file = fs::File::create(path.clone()).unwrap();
94        let mut writer =
95            parquet::arrow::ArrowWriter::try_new(file, file_schema.clone(), None)
96                .unwrap();
97
98        let ids = Arc::new(Int32Array::from(vec![1i32]));
99        let rec_batch = RecordBatch::try_new(file_schema.clone(), vec![ids]).unwrap();
100
101        writer.write(&rec_batch).unwrap();
102        writer.close().unwrap();
103
104        let location = Path::parse(path.to_str().unwrap()).unwrap();
105        let metadata = fs::metadata(path.as_path()).expect("Local file metadata");
106        let meta = ObjectMeta {
107            location,
108            last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
109            size: metadata.len(),
110            e_tag: None,
111            version: None,
112        };
113
114        let partitioned_file = PartitionedFile {
115            object_meta: meta,
116            partition_values: vec![],
117            range: None,
118            statistics: None,
119            extensions: None,
120            metadata_size_hint: None,
121        };
122
123        let f1 = Field::new("id", DataType::Int32, true);
124        let f2 = Field::new("extra_column", DataType::Utf8, true);
125
126        let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
127        let source = ParquetSource::default()
128            .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}))
129            .unwrap();
130        let base_conf = FileScanConfigBuilder::new(
131            ObjectStoreUrl::local_filesystem(),
132            schema,
133            source,
134        )
135        .with_file(partitioned_file)
136        .build();
137
138        let parquet_exec = DataSourceExec::from_data_source(base_conf);
139
140        let session_ctx = SessionContext::new();
141        let task_ctx = session_ctx.task_ctx();
142        let read = collect(parquet_exec, task_ctx).await.unwrap();
143
144        insta::assert_snapshot!(batches_to_sort_string(&read),@r###"
145        +----+--------------+
146        | id | extra_column |
147        +----+--------------+
148        | 1  | foo          |
149        +----+--------------+
150        "###);
151    }
152
153    #[test]
154    fn default_schema_adapter() {
155        let table_schema = Schema::new(vec![
156            Field::new("a", DataType::Int32, true),
157            Field::new("b", DataType::Utf8, true),
158        ]);
159
160        // file has a subset of the table schema fields and different type
161        let file_schema = Schema::new(vec![
162            Field::new("c", DataType::Float64, true), // not in table schema
163            Field::new("b", DataType::Float64, true),
164        ]);
165
166        let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
167        let (mapper, indices) = adapter.map_schema(&file_schema).unwrap();
168        assert_eq!(indices, vec![1]);
169
170        let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap();
171
172        let mapped_batch = mapper.map_batch(file_batch).unwrap();
173
174        // the mapped batch has the correct schema and the "b" column has been cast to Utf8
175        let expected_batch = record_batch!(
176            ("a", Int32, vec![None, None]), // missing column filled with nulls
177            ("b", Utf8, vec!["1.0", "2.0"])  // b was cast to string and order was changed
178        )
179        .unwrap();
180        assert_eq!(mapped_batch, expected_batch);
181    }
182
183    #[test]
184    fn default_schema_adapter_non_nullable_columns() {
185        let table_schema = Schema::new(vec![
186            Field::new("a", DataType::Int32, false), // "a"" is declared non nullable
187            Field::new("b", DataType::Utf8, true),
188        ]);
189        let file_schema = Schema::new(vec![
190            // since file doesn't have "a" it will be filled with nulls
191            Field::new("b", DataType::Float64, true),
192        ]);
193
194        let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
195        let (mapper, indices) = adapter.map_schema(&file_schema).unwrap();
196        assert_eq!(indices, vec![0]);
197
198        let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap();
199
200        // Mapping fails because it tries to fill in a non-nullable column with nulls
201        let err = mapper.map_batch(file_batch).unwrap_err().to_string();
202        assert!(err.contains("Invalid argument error: Column 'a' is declared as non-nullable but contains null values"), "{err}");
203    }
204
205    #[derive(Debug)]
206    struct TestSchemaAdapterFactory;
207
208    impl SchemaAdapterFactory for TestSchemaAdapterFactory {
209        fn create(
210            &self,
211            projected_table_schema: SchemaRef,
212            _table_schema: SchemaRef,
213        ) -> Box<dyn SchemaAdapter> {
214            Box::new(TestSchemaAdapter {
215                table_schema: projected_table_schema,
216            })
217        }
218    }
219
220    struct TestSchemaAdapter {
221        /// Schema for the table
222        table_schema: SchemaRef,
223    }
224
225    impl SchemaAdapter for TestSchemaAdapter {
226        fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
227            let field = self.table_schema.field(index);
228            Some(file_schema.fields.find(field.name())?.0)
229        }
230
231        fn map_schema(
232            &self,
233            file_schema: &Schema,
234        ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
235            let mut projection = Vec::with_capacity(file_schema.fields().len());
236
237            for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
238                if self.table_schema.fields().find(file_field.name()).is_some() {
239                    projection.push(file_idx);
240                }
241            }
242
243            Ok((Arc::new(TestSchemaMapping {}), projection))
244        }
245    }
246
247    #[derive(Debug)]
248    struct TestSchemaMapping {}
249
250    impl SchemaMapper for TestSchemaMapping {
251        fn map_batch(
252            &self,
253            batch: RecordBatch,
254        ) -> datafusion_common::Result<RecordBatch> {
255            let f1 = Field::new("id", DataType::Int32, true);
256            let f2 = Field::new("extra_column", DataType::Utf8, true);
257
258            let schema = Arc::new(Schema::new(vec![f1, f2]));
259
260            let extra_column = Arc::new(StringArray::from(vec!["foo"]));
261            let mut new_columns = batch.columns().to_vec();
262            new_columns.push(extra_column);
263
264            Ok(RecordBatch::try_new(schema, new_columns).unwrap())
265        }
266
267        fn map_column_statistics(
268            &self,
269            _file_col_statistics: &[datafusion_common::ColumnStatistics],
270        ) -> datafusion_common::Result<Vec<datafusion_common::ColumnStatistics>> {
271            unimplemented!()
272        }
273    }
274}