datafusion/datasource/physical_plan/
avro.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Reexports the [`datafusion_datasource_json::source`] module, containing [Avro] based [`FileSource`].
19//!
20//! [Avro]: https://avro.apache.org/
21//! [`FileSource`]: datafusion_datasource::file::FileSource
22
23pub use datafusion_datasource_avro::source::*;
24
25#[cfg(test)]
26mod tests {
27
28    use std::sync::Arc;
29
30    use crate::prelude::SessionContext;
31    use crate::test::object_store::local_unpartitioned_file;
32    use arrow::datatypes::{DataType, Field, SchemaBuilder};
33    use datafusion_common::test_util::batches_to_string;
34    use datafusion_common::{test_util, Result, ScalarValue};
35    use datafusion_datasource::file_format::FileFormat;
36    use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
37    use datafusion_datasource::PartitionedFile;
38    use datafusion_datasource_avro::source::AvroSource;
39    use datafusion_datasource_avro::AvroFormat;
40    use datafusion_execution::object_store::ObjectStoreUrl;
41    use datafusion_physical_plan::ExecutionPlan;
42
43    use datafusion_datasource::source::DataSourceExec;
44    use futures::StreamExt;
45    use insta::assert_snapshot;
46    use object_store::chunked::ChunkedStore;
47    use object_store::local::LocalFileSystem;
48    use object_store::ObjectStore;
49    use rstest::*;
50    use url::Url;
51
52    #[tokio::test]
53    async fn avro_exec_without_partition() -> Result<()> {
54        test_with_stores(Arc::new(LocalFileSystem::new())).await
55    }
56
57    #[rstest]
58    #[tokio::test]
59    async fn test_chunked_avro(
60        #[values(10, 20, 30, 40)] chunk_size: usize,
61    ) -> Result<()> {
62        test_with_stores(Arc::new(ChunkedStore::new(
63            Arc::new(LocalFileSystem::new()),
64            chunk_size,
65        )))
66        .await
67    }
68
69    async fn test_with_stores(store: Arc<dyn ObjectStore>) -> Result<()> {
70        let session_ctx = SessionContext::new();
71        let state = session_ctx.state();
72
73        let url = Url::parse("file://").unwrap();
74        session_ctx.register_object_store(&url, store.clone());
75
76        let testdata = test_util::arrow_test_data();
77        let filename = format!("{testdata}/avro/alltypes_plain.avro");
78        let meta = local_unpartitioned_file(filename);
79
80        let file_schema = AvroFormat {}
81            .infer_schema(&state, &store, std::slice::from_ref(&meta))
82            .await?;
83
84        let source = Arc::new(AvroSource::new());
85        let conf = FileScanConfigBuilder::new(
86            ObjectStoreUrl::local_filesystem(),
87            file_schema,
88            source,
89        )
90        .with_file(meta.into())
91        .with_projection_indices(Some(vec![0, 1, 2]))
92        .build();
93
94        let source_exec = DataSourceExec::from_data_source(conf);
95        assert_eq!(
96            source_exec
97                .properties()
98                .output_partitioning()
99                .partition_count(),
100            1
101        );
102        let mut results = source_exec
103            .execute(0, state.task_ctx())
104            .expect("plan execution failed");
105
106        let batch = results
107            .next()
108            .await
109            .expect("plan iterator empty")
110            .expect("plan iterator returned an error");
111
112        insta::allow_duplicates! {assert_snapshot!(batches_to_string(&[batch]), @r###"
113            +----+----------+-------------+
114            | id | bool_col | tinyint_col |
115            +----+----------+-------------+
116            | 4  | true     | 0           |
117            | 5  | false    | 1           |
118            | 6  | true     | 0           |
119            | 7  | false    | 1           |
120            | 2  | true     | 0           |
121            | 3  | false    | 1           |
122            | 0  | true     | 0           |
123            | 1  | false    | 1           |
124            +----+----------+-------------+
125        "###);}
126
127        let batch = results.next().await;
128        assert!(batch.is_none());
129
130        let batch = results.next().await;
131        assert!(batch.is_none());
132
133        let batch = results.next().await;
134        assert!(batch.is_none());
135
136        Ok(())
137    }
138
139    #[tokio::test]
140    async fn avro_exec_missing_column() -> Result<()> {
141        let session_ctx = SessionContext::new();
142        let state = session_ctx.state();
143
144        let testdata = test_util::arrow_test_data();
145        let filename = format!("{testdata}/avro/alltypes_plain.avro");
146        let object_store = Arc::new(LocalFileSystem::new()) as _;
147        let object_store_url = ObjectStoreUrl::local_filesystem();
148        let meta = local_unpartitioned_file(filename);
149        let actual_schema = AvroFormat {}
150            .infer_schema(&state, &object_store, std::slice::from_ref(&meta))
151            .await?;
152
153        let mut builder = SchemaBuilder::from(actual_schema.fields());
154        builder.push(Field::new("missing_col", DataType::Int32, true));
155
156        let file_schema = Arc::new(builder.finish());
157        // Include the missing column in the projection
158        let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
159
160        let source = Arc::new(AvroSource::new());
161        let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
162            .with_file(meta.into())
163            .with_projection_indices(projection)
164            .build();
165
166        let source_exec = DataSourceExec::from_data_source(conf);
167        assert_eq!(
168            source_exec
169                .properties()
170                .output_partitioning()
171                .partition_count(),
172            1
173        );
174
175        let mut results = source_exec
176            .execute(0, state.task_ctx())
177            .expect("plan execution failed");
178
179        let batch = results
180            .next()
181            .await
182            .expect("plan iterator empty")
183            .expect("plan iterator returned an error");
184
185        insta::allow_duplicates! {assert_snapshot!(batches_to_string(&[batch]), @r###"
186            +----+----------+-------------+-------------+
187            | id | bool_col | tinyint_col | missing_col |
188            +----+----------+-------------+-------------+
189            | 4  | true     | 0           |             |
190            | 5  | false    | 1           |             |
191            | 6  | true     | 0           |             |
192            | 7  | false    | 1           |             |
193            | 2  | true     | 0           |             |
194            | 3  | false    | 1           |             |
195            | 0  | true     | 0           |             |
196            | 1  | false    | 1           |             |
197            +----+----------+-------------+-------------+
198        "###);}
199
200        let batch = results.next().await;
201        assert!(batch.is_none());
202
203        let batch = results.next().await;
204        assert!(batch.is_none());
205
206        let batch = results.next().await;
207        assert!(batch.is_none());
208
209        Ok(())
210    }
211
212    #[tokio::test]
213    async fn avro_exec_with_partition() -> Result<()> {
214        let session_ctx = SessionContext::new();
215        let state = session_ctx.state();
216
217        let testdata = test_util::arrow_test_data();
218        let filename = format!("{testdata}/avro/alltypes_plain.avro");
219        let object_store = Arc::new(LocalFileSystem::new()) as _;
220        let object_store_url = ObjectStoreUrl::local_filesystem();
221        let meta = local_unpartitioned_file(filename);
222        let file_schema = AvroFormat {}
223            .infer_schema(&state, &object_store, std::slice::from_ref(&meta))
224            .await?;
225
226        let mut partitioned_file = PartitionedFile::from(meta);
227        partitioned_file.partition_values = vec![ScalarValue::from("2021-10-26")];
228
229        let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
230        let source = Arc::new(AvroSource::new());
231        let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
232            // select specific columns of the files as well as the partitioning
233            // column which is supposed to be the last column in the table schema.
234            .with_projection_indices(projection)
235            .with_file(partitioned_file)
236            .with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)])
237            .build();
238
239        let source_exec = DataSourceExec::from_data_source(conf);
240
241        assert_eq!(
242            source_exec
243                .properties()
244                .output_partitioning()
245                .partition_count(),
246            1
247        );
248
249        let mut results = source_exec
250            .execute(0, state.task_ctx())
251            .expect("plan execution failed");
252
253        let batch = results
254            .next()
255            .await
256            .expect("plan iterator empty")
257            .expect("plan iterator returned an error");
258
259        insta::allow_duplicates! {assert_snapshot!(batches_to_string(&[batch]), @r###"
260            +----+----------+------------+-------------+
261            | id | bool_col | date       | tinyint_col |
262            +----+----------+------------+-------------+
263            | 4  | true     | 2021-10-26 | 0           |
264            | 5  | false    | 2021-10-26 | 1           |
265            | 6  | true     | 2021-10-26 | 0           |
266            | 7  | false    | 2021-10-26 | 1           |
267            | 2  | true     | 2021-10-26 | 0           |
268            | 3  | false    | 2021-10-26 | 1           |
269            | 0  | true     | 2021-10-26 | 0           |
270            | 1  | false    | 2021-10-26 | 1           |
271            +----+----------+------------+-------------+
272        "###);}
273
274        let batch = results.next().await;
275        assert!(batch.is_none());
276
277        Ok(())
278    }
279}