datafusion/datasource/file_format/
json.rs1pub use datafusion_datasource_json::file_format::*;
20
21#[cfg(test)]
22mod tests {
23 use std::sync::Arc;
24
25 use super::*;
26
27 use crate::datasource::file_format::test_util::scan_format;
28 use crate::prelude::{NdJsonReadOptions, SessionConfig, SessionContext};
29 use crate::test::object_store::local_unpartitioned_file;
30 use arrow::array::RecordBatch;
31 use arrow_schema::Schema;
32 use bytes::Bytes;
33 use datafusion_catalog::Session;
34 use datafusion_common::test_util::batches_to_string;
35 use datafusion_datasource::decoder::{
36 BatchDeserializer, DecoderDeserializer, DeserializerOutput,
37 };
38 use datafusion_datasource::file_format::FileFormat;
39 use datafusion_physical_plan::{collect, ExecutionPlan};
40
41 use arrow::compute::concat_batches;
42 use arrow::datatypes::{DataType, Field};
43 use arrow::json::ReaderBuilder;
44 use arrow::util::pretty;
45 use datafusion_common::cast::as_int64_array;
46 use datafusion_common::internal_err;
47 use datafusion_common::stats::Precision;
48
49 use datafusion_common::Result;
50 use futures::StreamExt;
51 use insta::assert_snapshot;
52 use object_store::local::LocalFileSystem;
53 use regex::Regex;
54 use rstest::rstest;
55
56 #[tokio::test]
57 async fn read_small_batches() -> Result<()> {
58 let config = SessionConfig::new().with_batch_size(2);
59 let session_ctx = SessionContext::new_with_config(config);
60 let state = session_ctx.state();
61 let task_ctx = state.task_ctx();
62 let projection = None;
63 let exec = get_exec(&state, projection, None).await?;
64 let stream = exec.execute(0, task_ctx)?;
65
66 let tt_batches: i32 = stream
67 .map(|batch| {
68 let batch = batch.unwrap();
69 assert_eq!(4, batch.num_columns());
70 assert_eq!(2, batch.num_rows());
71 })
72 .fold(0, |acc, _| async move { acc + 1i32 })
73 .await;
74
75 assert_eq!(tt_batches, 6 );
76
77 assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent);
79 assert_eq!(
80 exec.partition_statistics(None)?.total_byte_size,
81 Precision::Absent
82 );
83
84 Ok(())
85 }
86
87 #[tokio::test]
88 async fn read_limit() -> Result<()> {
89 let session_ctx = SessionContext::new();
90 let state = session_ctx.state();
91 let task_ctx = state.task_ctx();
92 let projection = None;
93 let exec = get_exec(&state, projection, Some(1)).await?;
94 let batches = collect(exec, task_ctx).await?;
95 assert_eq!(1, batches.len());
96 assert_eq!(4, batches[0].num_columns());
97 assert_eq!(1, batches[0].num_rows());
98
99 Ok(())
100 }
101
102 #[tokio::test]
103 async fn infer_schema() -> Result<()> {
104 let projection = None;
105 let session_ctx = SessionContext::new();
106 let state = session_ctx.state();
107 let exec = get_exec(&state, projection, None).await?;
108
109 let x: Vec<String> = exec
110 .schema()
111 .fields()
112 .iter()
113 .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
114 .collect();
115 assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean", "d: Utf8",], x);
116
117 Ok(())
118 }
119
120 #[tokio::test]
121 async fn read_int_column() -> Result<()> {
122 let session_ctx = SessionContext::new();
123 let state = session_ctx.state();
124 let task_ctx = state.task_ctx();
125 let projection = Some(vec![0]);
126 let exec = get_exec(&state, projection, None).await?;
127
128 let batches = collect(exec, task_ctx).await.expect("Collect batches");
129
130 assert_eq!(1, batches.len());
131 assert_eq!(1, batches[0].num_columns());
132 assert_eq!(12, batches[0].num_rows());
133
134 let array = as_int64_array(batches[0].column(0))?;
135 let mut values: Vec<i64> = vec![];
136 for i in 0..batches[0].num_rows() {
137 values.push(array.value(i));
138 }
139
140 assert_eq!(
141 vec![1, -10, 2, 1, 7, 1, 1, 5, 1, 1, 1, 100000000000000],
142 values
143 );
144
145 Ok(())
146 }
147
148 async fn get_exec(
149 state: &dyn Session,
150 projection: Option<Vec<usize>>,
151 limit: Option<usize>,
152 ) -> Result<Arc<dyn ExecutionPlan>> {
153 let filename = "tests/data/2.json";
154 let format = JsonFormat::default();
155 scan_format(state, &format, None, ".", filename, projection, limit).await
156 }
157
158 #[tokio::test]
159 async fn infer_schema_with_limit() {
160 let session = SessionContext::new();
161 let ctx = session.state();
162 let store = Arc::new(LocalFileSystem::new()) as _;
163 let filename = "tests/data/schema_infer_limit.json";
164 let format = JsonFormat::default().with_schema_infer_max_rec(3);
165
166 let file_schema = format
167 .infer_schema(&ctx, &store, &[local_unpartitioned_file(filename)])
168 .await
169 .expect("Schema inference");
170
171 let fields = file_schema
172 .fields()
173 .iter()
174 .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
175 .collect::<Vec<_>>();
176 assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
177 }
178
179 async fn count_num_partitions(ctx: &SessionContext, query: &str) -> Result<usize> {
180 let result = ctx
181 .sql(&format!("EXPLAIN {query}"))
182 .await?
183 .collect()
184 .await?;
185
186 let plan = format!("{}", &pretty::pretty_format_batches(&result)?);
187
188 let re = Regex::new(r"file_groups=\{(\d+) group").unwrap();
189
190 if let Some(captures) = re.captures(&plan) {
191 if let Some(match_) = captures.get(1) {
192 let count = match_.as_str().parse::<usize>().unwrap();
193 return Ok(count);
194 }
195 }
196
197 internal_err!("Query contains no Exec: file_groups")
198 }
199
200 #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
201 #[tokio::test]
202 async fn it_can_read_ndjson_in_parallel(n_partitions: usize) -> Result<()> {
203 let config = SessionConfig::new()
204 .with_repartition_file_scans(true)
205 .with_repartition_file_min_size(0)
206 .with_target_partitions(n_partitions);
207
208 let ctx = SessionContext::new_with_config(config);
209
210 let table_path = "tests/data/1.json";
211 let options = NdJsonReadOptions::default();
212
213 ctx.register_json("json_parallel", table_path, options)
214 .await?;
215
216 let query = "SELECT sum(a) FROM json_parallel;";
217
218 let result = ctx.sql(query).await?.collect().await?;
219 let actual_partitions = count_num_partitions(&ctx, query).await?;
220
221 insta::allow_duplicates! {assert_snapshot!(batches_to_string(&result),@r###"
222 +----------------------+
223 | sum(json_parallel.a) |
224 +----------------------+
225 | -7 |
226 +----------------------+
227 "###);}
228
229 assert_eq!(n_partitions, actual_partitions);
230
231 Ok(())
232 }
233
234 #[tokio::test]
235 async fn it_can_read_empty_ndjson() -> Result<()> {
236 let config = SessionConfig::new()
237 .with_repartition_file_scans(true)
238 .with_repartition_file_min_size(0);
239
240 let ctx = SessionContext::new_with_config(config);
241
242 let table_path = "tests/data/empty.json";
243 let options = NdJsonReadOptions::default();
244
245 ctx.register_json("json_parallel_empty", table_path, options)
246 .await?;
247
248 let query = "SELECT * FROM json_parallel_empty WHERE random() > 0.5;";
249
250 let result = ctx.sql(query).await?.collect().await?;
251
252 assert_snapshot!(batches_to_string(&result),@r###"
253 ++
254 ++
255 "###);
256
257 Ok(())
258 }
259
260 #[test]
261 fn test_json_deserializer_finish() -> Result<()> {
262 let schema = Arc::new(Schema::new(vec![
263 Field::new("c1", DataType::Int64, true),
264 Field::new("c2", DataType::Int64, true),
265 Field::new("c3", DataType::Int64, true),
266 Field::new("c4", DataType::Int64, true),
267 Field::new("c5", DataType::Int64, true),
268 ]));
269 let mut deserializer = json_deserializer(1, &schema)?;
270
271 deserializer.digest(r#"{ "c1": 1, "c2": 2, "c3": 3, "c4": 4, "c5": 5 }"#.into());
272 deserializer.digest(r#"{ "c1": 6, "c2": 7, "c3": 8, "c4": 9, "c5": 10 }"#.into());
273 deserializer
274 .digest(r#"{ "c1": 11, "c2": 12, "c3": 13, "c4": 14, "c5": 15 }"#.into());
275 deserializer.finish();
276
277 let mut all_batches = RecordBatch::new_empty(schema.clone());
278 for _ in 0..3 {
279 let output = deserializer.next()?;
280 let DeserializerOutput::RecordBatch(batch) = output else {
281 panic!("Expected RecordBatch, got {output:?}");
282 };
283 all_batches = concat_batches(&schema, &[all_batches, batch])?
284 }
285 assert_eq!(deserializer.next()?, DeserializerOutput::InputExhausted);
286
287 assert_snapshot!(batches_to_string(&[all_batches]),@r###"
288 +----+----+----+----+----+
289 | c1 | c2 | c3 | c4 | c5 |
290 +----+----+----+----+----+
291 | 1 | 2 | 3 | 4 | 5 |
292 | 6 | 7 | 8 | 9 | 10 |
293 | 11 | 12 | 13 | 14 | 15 |
294 +----+----+----+----+----+
295 "###);
296
297 Ok(())
298 }
299
300 #[test]
301 fn test_json_deserializer_no_finish() -> Result<()> {
302 let schema = Arc::new(Schema::new(vec![
303 Field::new("c1", DataType::Int64, true),
304 Field::new("c2", DataType::Int64, true),
305 Field::new("c3", DataType::Int64, true),
306 Field::new("c4", DataType::Int64, true),
307 Field::new("c5", DataType::Int64, true),
308 ]));
309 let mut deserializer = json_deserializer(1, &schema)?;
310
311 deserializer.digest(r#"{ "c1": 1, "c2": 2, "c3": 3, "c4": 4, "c5": 5 }"#.into());
312 deserializer.digest(r#"{ "c1": 6, "c2": 7, "c3": 8, "c4": 9, "c5": 10 }"#.into());
313 deserializer
314 .digest(r#"{ "c1": 11, "c2": 12, "c3": 13, "c4": 14, "c5": 15 }"#.into());
315
316 let mut all_batches = RecordBatch::new_empty(schema.clone());
317 for _ in 0..2 {
319 let output = deserializer.next()?;
320 let DeserializerOutput::RecordBatch(batch) = output else {
321 panic!("Expected RecordBatch, got {output:?}");
322 };
323 all_batches = concat_batches(&schema, &[all_batches, batch])?
324 }
325 assert_eq!(deserializer.next()?, DeserializerOutput::RequiresMoreData);
326
327 insta::assert_snapshot!(fmt_batches(&[all_batches]),@r###"
328 +----+----+----+----+----+
329 | c1 | c2 | c3 | c4 | c5 |
330 +----+----+----+----+----+
331 | 1 | 2 | 3 | 4 | 5 |
332 | 6 | 7 | 8 | 9 | 10 |
333 +----+----+----+----+----+
334 "###);
335
336 Ok(())
337 }
338
339 fn json_deserializer(
340 batch_size: usize,
341 schema: &Arc<Schema>,
342 ) -> Result<impl BatchDeserializer<Bytes>> {
343 let decoder = ReaderBuilder::new(schema.clone())
344 .with_batch_size(batch_size)
345 .build_decoder()?;
346 Ok(DecoderDeserializer::new(JsonDecoder::new(decoder)))
347 }
348
349 fn fmt_batches(batches: &[RecordBatch]) -> String {
350 pretty::pretty_format_batches(batches).unwrap().to_string()
351 }
352}