datafusion/datasource/physical_plan/
avro.rs1pub use datafusion_datasource_avro::source::*;
24
25#[cfg(test)]
26mod tests {
27
28 use std::sync::Arc;
29
30 use crate::prelude::SessionContext;
31 use crate::test::object_store::local_unpartitioned_file;
32 use arrow::datatypes::{DataType, Field, SchemaBuilder};
33 use datafusion_common::test_util::batches_to_string;
34 use datafusion_common::{test_util, Result, ScalarValue};
35 use datafusion_datasource::file_format::FileFormat;
36 use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
37 use datafusion_datasource::PartitionedFile;
38 use datafusion_datasource_avro::source::AvroSource;
39 use datafusion_datasource_avro::AvroFormat;
40 use datafusion_execution::object_store::ObjectStoreUrl;
41 use datafusion_physical_plan::ExecutionPlan;
42
43 use datafusion_datasource::source::DataSourceExec;
44 use futures::StreamExt;
45 use insta::assert_snapshot;
46 use object_store::chunked::ChunkedStore;
47 use object_store::local::LocalFileSystem;
48 use object_store::ObjectStore;
49 use rstest::*;
50 use url::Url;
51
52 #[tokio::test]
53 async fn avro_exec_without_partition() -> Result<()> {
54 test_with_stores(Arc::new(LocalFileSystem::new())).await
55 }
56
57 #[rstest]
58 #[tokio::test]
59 async fn test_chunked_avro(
60 #[values(10, 20, 30, 40)] chunk_size: usize,
61 ) -> Result<()> {
62 test_with_stores(Arc::new(ChunkedStore::new(
63 Arc::new(LocalFileSystem::new()),
64 chunk_size,
65 )))
66 .await
67 }
68
69 async fn test_with_stores(store: Arc<dyn ObjectStore>) -> Result<()> {
70 let session_ctx = SessionContext::new();
71 let state = session_ctx.state();
72
73 let url = Url::parse("file://").unwrap();
74 session_ctx.register_object_store(&url, store.clone());
75
76 let testdata = test_util::arrow_test_data();
77 let filename = format!("{testdata}/avro/alltypes_plain.avro");
78 let meta = local_unpartitioned_file(filename);
79
80 let file_schema = AvroFormat {}
81 .infer_schema(&state, &store, std::slice::from_ref(&meta))
82 .await?;
83
84 let source = Arc::new(AvroSource::new());
85 let conf = FileScanConfigBuilder::new(
86 ObjectStoreUrl::local_filesystem(),
87 file_schema,
88 source,
89 )
90 .with_file(meta.into())
91 .with_projection_indices(Some(vec![0, 1, 2]))
92 .build();
93
94 let source_exec = DataSourceExec::from_data_source(conf);
95 assert_eq!(
96 source_exec
97 .properties()
98 .output_partitioning()
99 .partition_count(),
100 1
101 );
102 let mut results = source_exec
103 .execute(0, state.task_ctx())
104 .expect("plan execution failed");
105
106 let batch = results
107 .next()
108 .await
109 .expect("plan iterator empty")
110 .expect("plan iterator returned an error");
111
112 insta::allow_duplicates! {assert_snapshot!(batches_to_string(&[batch]), @r###"
113 +----+----------+-------------+
114 | id | bool_col | tinyint_col |
115 +----+----------+-------------+
116 | 4 | true | 0 |
117 | 5 | false | 1 |
118 | 6 | true | 0 |
119 | 7 | false | 1 |
120 | 2 | true | 0 |
121 | 3 | false | 1 |
122 | 0 | true | 0 |
123 | 1 | false | 1 |
124 +----+----------+-------------+
125 "###);}
126
127 let batch = results.next().await;
128 assert!(batch.is_none());
129
130 let batch = results.next().await;
131 assert!(batch.is_none());
132
133 let batch = results.next().await;
134 assert!(batch.is_none());
135
136 Ok(())
137 }
138
139 #[tokio::test]
140 async fn avro_exec_missing_column() -> Result<()> {
141 let session_ctx = SessionContext::new();
142 let state = session_ctx.state();
143
144 let testdata = test_util::arrow_test_data();
145 let filename = format!("{testdata}/avro/alltypes_plain.avro");
146 let object_store = Arc::new(LocalFileSystem::new()) as _;
147 let object_store_url = ObjectStoreUrl::local_filesystem();
148 let meta = local_unpartitioned_file(filename);
149 let actual_schema = AvroFormat {}
150 .infer_schema(&state, &object_store, std::slice::from_ref(&meta))
151 .await?;
152
153 let mut builder = SchemaBuilder::from(actual_schema.fields());
154 builder.push(Field::new("missing_col", DataType::Int32, true));
155
156 let file_schema = Arc::new(builder.finish());
157 let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
159
160 let source = Arc::new(AvroSource::new());
161 let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
162 .with_file(meta.into())
163 .with_projection_indices(projection)
164 .build();
165
166 let source_exec = DataSourceExec::from_data_source(conf);
167 assert_eq!(
168 source_exec
169 .properties()
170 .output_partitioning()
171 .partition_count(),
172 1
173 );
174
175 let mut results = source_exec
176 .execute(0, state.task_ctx())
177 .expect("plan execution failed");
178
179 let batch = results
180 .next()
181 .await
182 .expect("plan iterator empty")
183 .expect("plan iterator returned an error");
184
185 insta::allow_duplicates! {assert_snapshot!(batches_to_string(&[batch]), @r###"
186 +----+----------+-------------+-------------+
187 | id | bool_col | tinyint_col | missing_col |
188 +----+----------+-------------+-------------+
189 | 4 | true | 0 | |
190 | 5 | false | 1 | |
191 | 6 | true | 0 | |
192 | 7 | false | 1 | |
193 | 2 | true | 0 | |
194 | 3 | false | 1 | |
195 | 0 | true | 0 | |
196 | 1 | false | 1 | |
197 +----+----------+-------------+-------------+
198 "###);}
199
200 let batch = results.next().await;
201 assert!(batch.is_none());
202
203 let batch = results.next().await;
204 assert!(batch.is_none());
205
206 let batch = results.next().await;
207 assert!(batch.is_none());
208
209 Ok(())
210 }
211
212 #[tokio::test]
213 async fn avro_exec_with_partition() -> Result<()> {
214 let session_ctx = SessionContext::new();
215 let state = session_ctx.state();
216
217 let testdata = test_util::arrow_test_data();
218 let filename = format!("{testdata}/avro/alltypes_plain.avro");
219 let object_store = Arc::new(LocalFileSystem::new()) as _;
220 let object_store_url = ObjectStoreUrl::local_filesystem();
221 let meta = local_unpartitioned_file(filename);
222 let file_schema = AvroFormat {}
223 .infer_schema(&state, &object_store, std::slice::from_ref(&meta))
224 .await?;
225
226 let mut partitioned_file = PartitionedFile::from(meta);
227 partitioned_file.partition_values = vec![ScalarValue::from("2021-10-26")];
228
229 let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
230 let source = Arc::new(AvroSource::new());
231 let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
232 .with_projection_indices(projection)
235 .with_file(partitioned_file)
236 .with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)])
237 .build();
238
239 let source_exec = DataSourceExec::from_data_source(conf);
240
241 assert_eq!(
242 source_exec
243 .properties()
244 .output_partitioning()
245 .partition_count(),
246 1
247 );
248
249 let mut results = source_exec
250 .execute(0, state.task_ctx())
251 .expect("plan execution failed");
252
253 let batch = results
254 .next()
255 .await
256 .expect("plan iterator empty")
257 .expect("plan iterator returned an error");
258
259 insta::allow_duplicates! {assert_snapshot!(batches_to_string(&[batch]), @r###"
260 +----+----------+------------+-------------+
261 | id | bool_col | date | tinyint_col |
262 +----+----------+------------+-------------+
263 | 4 | true | 2021-10-26 | 0 |
264 | 5 | false | 2021-10-26 | 1 |
265 | 6 | true | 2021-10-26 | 0 |
266 | 7 | false | 2021-10-26 | 1 |
267 | 2 | true | 2021-10-26 | 0 |
268 | 3 | false | 2021-10-26 | 1 |
269 | 0 | true | 2021-10-26 | 0 |
270 | 1 | false | 2021-10-26 | 1 |
271 +----+----------+------------+-------------+
272 "###);}
273
274 let batch = results.next().await;
275 assert!(batch.is_none());
276
277 Ok(())
278 }
279}