datafusion/datasource/file_format/
avro.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Re-exports the [`datafusion_datasource_avro::file_format`] module, and contains tests for it.
19
20pub use datafusion_datasource_avro::file_format::*;
21
22#[cfg(test)]
23mod tests {
24    use std::sync::Arc;
25
26    use crate::{
27        datasource::file_format::test_util::scan_format, prelude::SessionContext,
28    };
29    use arrow::array::{as_string_array, Array};
30    use datafusion_catalog::Session;
31    use datafusion_common::test_util::batches_to_string;
32    use datafusion_common::{
33        cast::{
34            as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
35            as_int32_array, as_timestamp_microsecond_array,
36        },
37        test_util, Result,
38    };
39
40    use datafusion_datasource_avro::AvroFormat;
41    use datafusion_execution::config::SessionConfig;
42    use datafusion_physical_plan::{collect, ExecutionPlan};
43    use futures::StreamExt;
44    use insta::assert_snapshot;
45
46    #[tokio::test]
47    async fn read_small_batches() -> Result<()> {
48        let config = SessionConfig::new().with_batch_size(2);
49        let session_ctx = SessionContext::new_with_config(config);
50        let state = session_ctx.state();
51        let task_ctx = state.task_ctx();
52        let projection = None;
53        let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
54        let stream = exec.execute(0, task_ctx)?;
55
56        let tt_batches = stream
57            .map(|batch| {
58                let batch = batch.unwrap();
59                assert_eq!(11, batch.num_columns());
60                assert_eq!(2, batch.num_rows());
61            })
62            .fold(0, |acc, _| async move { acc + 1i32 })
63            .await;
64
65        assert_eq!(tt_batches, 4 /* 8/2 */);
66
67        Ok(())
68    }
69
70    #[tokio::test]
71    async fn read_limit() -> Result<()> {
72        let session_ctx = SessionContext::new();
73        let state = session_ctx.state();
74        let task_ctx = state.task_ctx();
75        let projection = None;
76        let exec = get_exec(&state, "alltypes_plain.avro", projection, Some(1)).await?;
77        let batches = collect(exec, task_ctx).await?;
78        assert_eq!(1, batches.len());
79        assert_eq!(11, batches[0].num_columns());
80        assert_eq!(1, batches[0].num_rows());
81
82        Ok(())
83    }
84
85    #[tokio::test]
86    async fn read_alltypes_plain_avro() -> Result<()> {
87        let session_ctx = SessionContext::new();
88        let state = session_ctx.state();
89        let task_ctx = state.task_ctx();
90        let projection = None;
91        let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
92
93        let x: Vec<String> = exec
94            .schema()
95            .fields()
96            .iter()
97            .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
98            .collect();
99        assert_eq!(
100            vec![
101                "id: Int32",
102                "bool_col: Boolean",
103                "tinyint_col: Int32",
104                "smallint_col: Int32",
105                "int_col: Int32",
106                "bigint_col: Int64",
107                "float_col: Float32",
108                "double_col: Float64",
109                "date_string_col: Binary",
110                "string_col: Binary",
111                "timestamp_col: Timestamp(Microsecond, None)",
112            ],
113            x
114        );
115
116        let batches = collect(exec, task_ctx).await?;
117        assert_eq!(batches.len(), 1);
118
119        assert_snapshot!(batches_to_string(&batches),@r###"
120            +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+
121            | id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col  | string_col | timestamp_col       |
122            +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+
123            | 4  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30332f30312f3039 | 30         | 2009-03-01T00:00:00 |
124            | 5  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30332f30312f3039 | 31         | 2009-03-01T00:01:00 |
125            | 6  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30342f30312f3039 | 30         | 2009-04-01T00:00:00 |
126            | 7  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30342f30312f3039 | 31         | 2009-04-01T00:01:00 |
127            | 2  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30322f30312f3039 | 30         | 2009-02-01T00:00:00 |
128            | 3  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30322f30312f3039 | 31         | 2009-02-01T00:01:00 |
129            | 0  | true     | 0           | 0            | 0       | 0          | 0.0       | 0.0        | 30312f30312f3039 | 30         | 2009-01-01T00:00:00 |
130            | 1  | false    | 1           | 1            | 1       | 10         | 1.1       | 10.1       | 30312f30312f3039 | 31         | 2009-01-01T00:01:00 |
131            +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+
132        "###);
133        Ok(())
134    }
135
136    #[tokio::test]
137    async fn read_bool_alltypes_plain_avro() -> Result<()> {
138        let session_ctx = SessionContext::new();
139        let state = session_ctx.state();
140        let task_ctx = state.task_ctx();
141        let projection = Some(vec![1]);
142        let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
143
144        let batches = collect(exec, task_ctx).await?;
145        assert_eq!(batches.len(), 1);
146        assert_eq!(1, batches[0].num_columns());
147        assert_eq!(8, batches[0].num_rows());
148
149        let array = as_boolean_array(batches[0].column(0))?;
150        let mut values: Vec<bool> = vec![];
151        for i in 0..batches[0].num_rows() {
152            values.push(array.value(i));
153        }
154
155        assert_eq!(
156            "[true, false, true, false, true, false, true, false]",
157            format!("{values:?}")
158        );
159
160        Ok(())
161    }
162
163    #[tokio::test]
164    async fn read_null_bool_alltypes_plain_avro() -> Result<()> {
165        let session_ctx = SessionContext::new();
166        let state = session_ctx.state();
167        let task_ctx = state.task_ctx();
168        let projection = Some(vec![2]);
169        let exec =
170            get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
171
172        let batches = collect(exec, task_ctx).await?;
173        assert_eq!(batches.len(), 1);
174        assert_eq!(1, batches[0].num_columns());
175        assert_eq!(1, batches[0].num_rows());
176
177        let array = as_boolean_array(batches[0].column(0))?;
178
179        assert!(array.is_null(0));
180
181        Ok(())
182    }
183
184    #[tokio::test]
185    async fn read_i32_alltypes_plain_avro() -> Result<()> {
186        let session_ctx = SessionContext::new();
187        let state = session_ctx.state();
188        let task_ctx = state.task_ctx();
189        let projection = Some(vec![0]);
190        let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
191
192        let batches = collect(exec, task_ctx).await?;
193        assert_eq!(batches.len(), 1);
194        assert_eq!(1, batches[0].num_columns());
195        assert_eq!(8, batches[0].num_rows());
196
197        let array = as_int32_array(batches[0].column(0))?;
198        let mut values: Vec<i32> = vec![];
199        for i in 0..batches[0].num_rows() {
200            values.push(array.value(i));
201        }
202
203        assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{values:?}"));
204
205        Ok(())
206    }
207
208    #[tokio::test]
209    async fn read_null_i32_alltypes_plain_avro() -> Result<()> {
210        let session_ctx = SessionContext::new();
211        let state = session_ctx.state();
212        let task_ctx = state.task_ctx();
213        let projection = Some(vec![1]);
214        let exec =
215            get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
216
217        let batches = collect(exec, task_ctx).await?;
218        assert_eq!(batches.len(), 1);
219        assert_eq!(1, batches[0].num_columns());
220        assert_eq!(1, batches[0].num_rows());
221
222        let array = as_int32_array(batches[0].column(0))?;
223
224        assert!(array.is_null(0));
225
226        Ok(())
227    }
228
229    #[tokio::test]
230    async fn read_i96_alltypes_plain_avro() -> Result<()> {
231        let session_ctx = SessionContext::new();
232        let state = session_ctx.state();
233        let task_ctx = state.task_ctx();
234        let projection = Some(vec![10]);
235        let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
236
237        let batches = collect(exec, task_ctx).await?;
238        assert_eq!(batches.len(), 1);
239        assert_eq!(1, batches[0].num_columns());
240        assert_eq!(8, batches[0].num_rows());
241
242        let array = as_timestamp_microsecond_array(batches[0].column(0))?;
243        let mut values: Vec<i64> = vec![];
244        for i in 0..batches[0].num_rows() {
245            values.push(array.value(i));
246        }
247
248        assert_eq!("[1235865600000000, 1235865660000000, 1238544000000000, 1238544060000000, 1233446400000000, 1233446460000000, 1230768000000000, 1230768060000000]", format!("{values:?}"));
249
250        Ok(())
251    }
252
253    #[tokio::test]
254    async fn read_f32_alltypes_plain_avro() -> Result<()> {
255        let session_ctx = SessionContext::new();
256        let state = session_ctx.state();
257        let task_ctx = state.task_ctx();
258        let projection = Some(vec![6]);
259        let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
260
261        let batches = collect(exec, task_ctx).await?;
262        assert_eq!(batches.len(), 1);
263        assert_eq!(1, batches[0].num_columns());
264        assert_eq!(8, batches[0].num_rows());
265
266        let array = as_float32_array(batches[0].column(0))?;
267        let mut values: Vec<f32> = vec![];
268        for i in 0..batches[0].num_rows() {
269            values.push(array.value(i));
270        }
271
272        assert_eq!(
273            "[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]",
274            format!("{values:?}")
275        );
276
277        Ok(())
278    }
279
280    #[tokio::test]
281    async fn read_f64_alltypes_plain_avro() -> Result<()> {
282        let session_ctx = SessionContext::new();
283        let state = session_ctx.state();
284        let task_ctx = state.task_ctx();
285        let projection = Some(vec![7]);
286        let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
287
288        let batches = collect(exec, task_ctx).await?;
289        assert_eq!(batches.len(), 1);
290        assert_eq!(1, batches[0].num_columns());
291        assert_eq!(8, batches[0].num_rows());
292
293        let array = as_float64_array(batches[0].column(0))?;
294        let mut values: Vec<f64> = vec![];
295        for i in 0..batches[0].num_rows() {
296            values.push(array.value(i));
297        }
298
299        assert_eq!(
300            "[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]",
301            format!("{values:?}")
302        );
303
304        Ok(())
305    }
306
307    #[tokio::test]
308    async fn read_binary_alltypes_plain_avro() -> Result<()> {
309        let session_ctx = SessionContext::new();
310        let state = session_ctx.state();
311        let task_ctx = state.task_ctx();
312        let projection = Some(vec![9]);
313        let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
314
315        let batches = collect(exec, task_ctx).await?;
316        assert_eq!(batches.len(), 1);
317        assert_eq!(1, batches[0].num_columns());
318        assert_eq!(8, batches[0].num_rows());
319
320        let array = as_binary_array(batches[0].column(0))?;
321        let mut values: Vec<&str> = vec![];
322        for i in 0..batches[0].num_rows() {
323            values.push(std::str::from_utf8(array.value(i)).unwrap());
324        }
325
326        assert_eq!(
327            "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]",
328            format!("{values:?}")
329        );
330
331        Ok(())
332    }
333
334    #[tokio::test]
335    async fn read_null_binary_alltypes_plain_avro() -> Result<()> {
336        let session_ctx = SessionContext::new();
337        let state = session_ctx.state();
338        let task_ctx = state.task_ctx();
339        let projection = Some(vec![6]);
340        let exec =
341            get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
342
343        let batches = collect(exec, task_ctx).await?;
344        assert_eq!(batches.len(), 1);
345        assert_eq!(1, batches[0].num_columns());
346        assert_eq!(1, batches[0].num_rows());
347
348        let array = as_binary_array(batches[0].column(0))?;
349
350        assert!(array.is_null(0));
351
352        Ok(())
353    }
354
355    #[tokio::test]
356    async fn read_null_string_alltypes_plain_avro() -> Result<()> {
357        let session_ctx = SessionContext::new();
358        let state = session_ctx.state();
359        let task_ctx = state.task_ctx();
360        let projection = Some(vec![0]);
361        let exec =
362            get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
363
364        let batches = collect(exec, task_ctx).await?;
365        assert_eq!(batches.len(), 1);
366        assert_eq!(1, batches[0].num_columns());
367        assert_eq!(1, batches[0].num_rows());
368
369        let array = as_string_array(batches[0].column(0));
370
371        assert!(array.is_null(0));
372
373        Ok(())
374    }
375
376    async fn get_exec(
377        state: &dyn Session,
378        file_name: &str,
379        projection: Option<Vec<usize>>,
380        limit: Option<usize>,
381    ) -> Result<Arc<dyn ExecutionPlan>> {
382        let testdata = test_util::arrow_test_data();
383        let store_root = format!("{testdata}/avro");
384        let format = AvroFormat {};
385        scan_format(
386            state,
387            &format,
388            None,
389            &store_root,
390            file_name,
391            projection,
392            limit,
393        )
394        .await
395    }
396}