1pub use datafusion_datasource_avro::file_format::*;
21
22#[cfg(test)]
23mod tests {
24 use std::sync::Arc;
25
26 use crate::{
27 datasource::file_format::test_util::scan_format, prelude::SessionContext,
28 };
29 use arrow::array::{as_string_array, Array};
30 use datafusion_catalog::Session;
31 use datafusion_common::test_util::batches_to_string;
32 use datafusion_common::{
33 cast::{
34 as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
35 as_int32_array, as_timestamp_microsecond_array,
36 },
37 test_util, Result,
38 };
39
40 use datafusion_datasource_avro::AvroFormat;
41 use datafusion_execution::config::SessionConfig;
42 use datafusion_physical_plan::{collect, ExecutionPlan};
43 use futures::StreamExt;
44 use insta::assert_snapshot;
45
46 #[tokio::test]
47 async fn read_small_batches() -> Result<()> {
48 let config = SessionConfig::new().with_batch_size(2);
49 let session_ctx = SessionContext::new_with_config(config);
50 let state = session_ctx.state();
51 let task_ctx = state.task_ctx();
52 let projection = None;
53 let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
54 let stream = exec.execute(0, task_ctx)?;
55
56 let tt_batches = stream
57 .map(|batch| {
58 let batch = batch.unwrap();
59 assert_eq!(11, batch.num_columns());
60 assert_eq!(2, batch.num_rows());
61 })
62 .fold(0, |acc, _| async move { acc + 1i32 })
63 .await;
64
65 assert_eq!(tt_batches, 4 );
66
67 Ok(())
68 }
69
70 #[tokio::test]
71 async fn read_limit() -> Result<()> {
72 let session_ctx = SessionContext::new();
73 let state = session_ctx.state();
74 let task_ctx = state.task_ctx();
75 let projection = None;
76 let exec = get_exec(&state, "alltypes_plain.avro", projection, Some(1)).await?;
77 let batches = collect(exec, task_ctx).await?;
78 assert_eq!(1, batches.len());
79 assert_eq!(11, batches[0].num_columns());
80 assert_eq!(1, batches[0].num_rows());
81
82 Ok(())
83 }
84
85 #[tokio::test]
86 async fn read_alltypes_plain_avro() -> Result<()> {
87 let session_ctx = SessionContext::new();
88 let state = session_ctx.state();
89 let task_ctx = state.task_ctx();
90 let projection = None;
91 let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
92
93 let x: Vec<String> = exec
94 .schema()
95 .fields()
96 .iter()
97 .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
98 .collect();
99 assert_eq!(
100 vec![
101 "id: Int32",
102 "bool_col: Boolean",
103 "tinyint_col: Int32",
104 "smallint_col: Int32",
105 "int_col: Int32",
106 "bigint_col: Int64",
107 "float_col: Float32",
108 "double_col: Float64",
109 "date_string_col: Binary",
110 "string_col: Binary",
111 "timestamp_col: Timestamp(Microsecond, None)",
112 ],
113 x
114 );
115
116 let batches = collect(exec, task_ctx).await?;
117 assert_eq!(batches.len(), 1);
118
119 assert_snapshot!(batches_to_string(&batches),@r###"
120 +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+
121 | id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |
122 +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+
123 | 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00 |
124 | 5 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30332f30312f3039 | 31 | 2009-03-01T00:01:00 |
125 | 6 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30342f30312f3039 | 30 | 2009-04-01T00:00:00 |
126 | 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01T00:01:00 |
127 | 2 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30322f30312f3039 | 30 | 2009-02-01T00:00:00 |
128 | 3 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30322f30312f3039 | 31 | 2009-02-01T00:01:00 |
129 | 0 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30312f30312f3039 | 30 | 2009-01-01T00:00:00 |
130 | 1 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30312f30312f3039 | 31 | 2009-01-01T00:01:00 |
131 +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+
132 "###);
133 Ok(())
134 }
135
136 #[tokio::test]
137 async fn read_bool_alltypes_plain_avro() -> Result<()> {
138 let session_ctx = SessionContext::new();
139 let state = session_ctx.state();
140 let task_ctx = state.task_ctx();
141 let projection = Some(vec![1]);
142 let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
143
144 let batches = collect(exec, task_ctx).await?;
145 assert_eq!(batches.len(), 1);
146 assert_eq!(1, batches[0].num_columns());
147 assert_eq!(8, batches[0].num_rows());
148
149 let array = as_boolean_array(batches[0].column(0))?;
150 let mut values: Vec<bool> = vec![];
151 for i in 0..batches[0].num_rows() {
152 values.push(array.value(i));
153 }
154
155 assert_eq!(
156 "[true, false, true, false, true, false, true, false]",
157 format!("{values:?}")
158 );
159
160 Ok(())
161 }
162
163 #[tokio::test]
164 async fn read_null_bool_alltypes_plain_avro() -> Result<()> {
165 let session_ctx = SessionContext::new();
166 let state = session_ctx.state();
167 let task_ctx = state.task_ctx();
168 let projection = Some(vec![2]);
169 let exec =
170 get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
171
172 let batches = collect(exec, task_ctx).await?;
173 assert_eq!(batches.len(), 1);
174 assert_eq!(1, batches[0].num_columns());
175 assert_eq!(1, batches[0].num_rows());
176
177 let array = as_boolean_array(batches[0].column(0))?;
178
179 assert!(array.is_null(0));
180
181 Ok(())
182 }
183
184 #[tokio::test]
185 async fn read_i32_alltypes_plain_avro() -> Result<()> {
186 let session_ctx = SessionContext::new();
187 let state = session_ctx.state();
188 let task_ctx = state.task_ctx();
189 let projection = Some(vec![0]);
190 let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
191
192 let batches = collect(exec, task_ctx).await?;
193 assert_eq!(batches.len(), 1);
194 assert_eq!(1, batches[0].num_columns());
195 assert_eq!(8, batches[0].num_rows());
196
197 let array = as_int32_array(batches[0].column(0))?;
198 let mut values: Vec<i32> = vec![];
199 for i in 0..batches[0].num_rows() {
200 values.push(array.value(i));
201 }
202
203 assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{values:?}"));
204
205 Ok(())
206 }
207
208 #[tokio::test]
209 async fn read_null_i32_alltypes_plain_avro() -> Result<()> {
210 let session_ctx = SessionContext::new();
211 let state = session_ctx.state();
212 let task_ctx = state.task_ctx();
213 let projection = Some(vec![1]);
214 let exec =
215 get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
216
217 let batches = collect(exec, task_ctx).await?;
218 assert_eq!(batches.len(), 1);
219 assert_eq!(1, batches[0].num_columns());
220 assert_eq!(1, batches[0].num_rows());
221
222 let array = as_int32_array(batches[0].column(0))?;
223
224 assert!(array.is_null(0));
225
226 Ok(())
227 }
228
229 #[tokio::test]
230 async fn read_i96_alltypes_plain_avro() -> Result<()> {
231 let session_ctx = SessionContext::new();
232 let state = session_ctx.state();
233 let task_ctx = state.task_ctx();
234 let projection = Some(vec![10]);
235 let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
236
237 let batches = collect(exec, task_ctx).await?;
238 assert_eq!(batches.len(), 1);
239 assert_eq!(1, batches[0].num_columns());
240 assert_eq!(8, batches[0].num_rows());
241
242 let array = as_timestamp_microsecond_array(batches[0].column(0))?;
243 let mut values: Vec<i64> = vec![];
244 for i in 0..batches[0].num_rows() {
245 values.push(array.value(i));
246 }
247
248 assert_eq!("[1235865600000000, 1235865660000000, 1238544000000000, 1238544060000000, 1233446400000000, 1233446460000000, 1230768000000000, 1230768060000000]", format!("{values:?}"));
249
250 Ok(())
251 }
252
253 #[tokio::test]
254 async fn read_f32_alltypes_plain_avro() -> Result<()> {
255 let session_ctx = SessionContext::new();
256 let state = session_ctx.state();
257 let task_ctx = state.task_ctx();
258 let projection = Some(vec![6]);
259 let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
260
261 let batches = collect(exec, task_ctx).await?;
262 assert_eq!(batches.len(), 1);
263 assert_eq!(1, batches[0].num_columns());
264 assert_eq!(8, batches[0].num_rows());
265
266 let array = as_float32_array(batches[0].column(0))?;
267 let mut values: Vec<f32> = vec![];
268 for i in 0..batches[0].num_rows() {
269 values.push(array.value(i));
270 }
271
272 assert_eq!(
273 "[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]",
274 format!("{values:?}")
275 );
276
277 Ok(())
278 }
279
280 #[tokio::test]
281 async fn read_f64_alltypes_plain_avro() -> Result<()> {
282 let session_ctx = SessionContext::new();
283 let state = session_ctx.state();
284 let task_ctx = state.task_ctx();
285 let projection = Some(vec![7]);
286 let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
287
288 let batches = collect(exec, task_ctx).await?;
289 assert_eq!(batches.len(), 1);
290 assert_eq!(1, batches[0].num_columns());
291 assert_eq!(8, batches[0].num_rows());
292
293 let array = as_float64_array(batches[0].column(0))?;
294 let mut values: Vec<f64> = vec![];
295 for i in 0..batches[0].num_rows() {
296 values.push(array.value(i));
297 }
298
299 assert_eq!(
300 "[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]",
301 format!("{values:?}")
302 );
303
304 Ok(())
305 }
306
307 #[tokio::test]
308 async fn read_binary_alltypes_plain_avro() -> Result<()> {
309 let session_ctx = SessionContext::new();
310 let state = session_ctx.state();
311 let task_ctx = state.task_ctx();
312 let projection = Some(vec![9]);
313 let exec = get_exec(&state, "alltypes_plain.avro", projection, None).await?;
314
315 let batches = collect(exec, task_ctx).await?;
316 assert_eq!(batches.len(), 1);
317 assert_eq!(1, batches[0].num_columns());
318 assert_eq!(8, batches[0].num_rows());
319
320 let array = as_binary_array(batches[0].column(0))?;
321 let mut values: Vec<&str> = vec![];
322 for i in 0..batches[0].num_rows() {
323 values.push(std::str::from_utf8(array.value(i)).unwrap());
324 }
325
326 assert_eq!(
327 "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]",
328 format!("{values:?}")
329 );
330
331 Ok(())
332 }
333
334 #[tokio::test]
335 async fn read_null_binary_alltypes_plain_avro() -> Result<()> {
336 let session_ctx = SessionContext::new();
337 let state = session_ctx.state();
338 let task_ctx = state.task_ctx();
339 let projection = Some(vec![6]);
340 let exec =
341 get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
342
343 let batches = collect(exec, task_ctx).await?;
344 assert_eq!(batches.len(), 1);
345 assert_eq!(1, batches[0].num_columns());
346 assert_eq!(1, batches[0].num_rows());
347
348 let array = as_binary_array(batches[0].column(0))?;
349
350 assert!(array.is_null(0));
351
352 Ok(())
353 }
354
355 #[tokio::test]
356 async fn read_null_string_alltypes_plain_avro() -> Result<()> {
357 let session_ctx = SessionContext::new();
358 let state = session_ctx.state();
359 let task_ctx = state.task_ctx();
360 let projection = Some(vec![0]);
361 let exec =
362 get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;
363
364 let batches = collect(exec, task_ctx).await?;
365 assert_eq!(batches.len(), 1);
366 assert_eq!(1, batches[0].num_columns());
367 assert_eq!(1, batches[0].num_rows());
368
369 let array = as_string_array(batches[0].column(0));
370
371 assert!(array.is_null(0));
372
373 Ok(())
374 }
375
376 async fn get_exec(
377 state: &dyn Session,
378 file_name: &str,
379 projection: Option<Vec<usize>>,
380 limit: Option<usize>,
381 ) -> Result<Arc<dyn ExecutionPlan>> {
382 let testdata = test_util::arrow_test_data();
383 let store_root = format!("{testdata}/avro");
384 let format = AvroFormat {};
385 scan_format(
386 state,
387 &format,
388 None,
389 &store_root,
390 file_name,
391 projection,
392 limit,
393 )
394 .await
395 }
396}