datafusion/datasource/file_format/
json.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Re-exports the [`datafusion_datasource_json::file_format`] module, and contains tests for it.
19pub use datafusion_datasource_json::file_format::*;
20
21#[cfg(test)]
22mod tests {
23    use std::sync::Arc;
24
25    use super::*;
26
27    use crate::datasource::file_format::test_util::scan_format;
28    use crate::prelude::{NdJsonReadOptions, SessionConfig, SessionContext};
29    use crate::test::object_store::local_unpartitioned_file;
30    use arrow::array::RecordBatch;
31    use arrow_schema::Schema;
32    use bytes::Bytes;
33    use datafusion_catalog::Session;
34    use datafusion_common::test_util::batches_to_string;
35    use datafusion_datasource::decoder::{
36        BatchDeserializer, DecoderDeserializer, DeserializerOutput,
37    };
38    use datafusion_datasource::file_format::FileFormat;
39    use datafusion_physical_plan::{collect, ExecutionPlan};
40
41    use arrow::compute::concat_batches;
42    use arrow::datatypes::{DataType, Field};
43    use arrow::json::ReaderBuilder;
44    use arrow::util::pretty;
45    use datafusion_common::cast::as_int64_array;
46    use datafusion_common::internal_err;
47    use datafusion_common::stats::Precision;
48
49    use datafusion_common::Result;
50    use futures::StreamExt;
51    use insta::assert_snapshot;
52    use object_store::local::LocalFileSystem;
53    use regex::Regex;
54    use rstest::rstest;
55
56    #[tokio::test]
57    async fn read_small_batches() -> Result<()> {
58        let config = SessionConfig::new().with_batch_size(2);
59        let session_ctx = SessionContext::new_with_config(config);
60        let state = session_ctx.state();
61        let task_ctx = state.task_ctx();
62        let projection = None;
63        let exec = get_exec(&state, projection, None).await?;
64        let stream = exec.execute(0, task_ctx)?;
65
66        let tt_batches: i32 = stream
67            .map(|batch| {
68                let batch = batch.unwrap();
69                assert_eq!(4, batch.num_columns());
70                assert_eq!(2, batch.num_rows());
71            })
72            .fold(0, |acc, _| async move { acc + 1i32 })
73            .await;
74
75        assert_eq!(tt_batches, 6 /* 12/2 */);
76
77        // test metadata
78        assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent);
79        assert_eq!(
80            exec.partition_statistics(None)?.total_byte_size,
81            Precision::Absent
82        );
83
84        Ok(())
85    }
86
87    #[tokio::test]
88    async fn read_limit() -> Result<()> {
89        let session_ctx = SessionContext::new();
90        let state = session_ctx.state();
91        let task_ctx = state.task_ctx();
92        let projection = None;
93        let exec = get_exec(&state, projection, Some(1)).await?;
94        let batches = collect(exec, task_ctx).await?;
95        assert_eq!(1, batches.len());
96        assert_eq!(4, batches[0].num_columns());
97        assert_eq!(1, batches[0].num_rows());
98
99        Ok(())
100    }
101
102    #[tokio::test]
103    async fn infer_schema() -> Result<()> {
104        let projection = None;
105        let session_ctx = SessionContext::new();
106        let state = session_ctx.state();
107        let exec = get_exec(&state, projection, None).await?;
108
109        let x: Vec<String> = exec
110            .schema()
111            .fields()
112            .iter()
113            .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
114            .collect();
115        assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean", "d: Utf8",], x);
116
117        Ok(())
118    }
119
120    #[tokio::test]
121    async fn read_int_column() -> Result<()> {
122        let session_ctx = SessionContext::new();
123        let state = session_ctx.state();
124        let task_ctx = state.task_ctx();
125        let projection = Some(vec![0]);
126        let exec = get_exec(&state, projection, None).await?;
127
128        let batches = collect(exec, task_ctx).await.expect("Collect batches");
129
130        assert_eq!(1, batches.len());
131        assert_eq!(1, batches[0].num_columns());
132        assert_eq!(12, batches[0].num_rows());
133
134        let array = as_int64_array(batches[0].column(0))?;
135        let mut values: Vec<i64> = vec![];
136        for i in 0..batches[0].num_rows() {
137            values.push(array.value(i));
138        }
139
140        assert_eq!(
141            vec![1, -10, 2, 1, 7, 1, 1, 5, 1, 1, 1, 100000000000000],
142            values
143        );
144
145        Ok(())
146    }
147
148    async fn get_exec(
149        state: &dyn Session,
150        projection: Option<Vec<usize>>,
151        limit: Option<usize>,
152    ) -> Result<Arc<dyn ExecutionPlan>> {
153        let filename = "tests/data/2.json";
154        let format = JsonFormat::default();
155        scan_format(state, &format, None, ".", filename, projection, limit).await
156    }
157
158    #[tokio::test]
159    async fn infer_schema_with_limit() {
160        let session = SessionContext::new();
161        let ctx = session.state();
162        let store = Arc::new(LocalFileSystem::new()) as _;
163        let filename = "tests/data/schema_infer_limit.json";
164        let format = JsonFormat::default().with_schema_infer_max_rec(3);
165
166        let file_schema = format
167            .infer_schema(&ctx, &store, &[local_unpartitioned_file(filename)])
168            .await
169            .expect("Schema inference");
170
171        let fields = file_schema
172            .fields()
173            .iter()
174            .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
175            .collect::<Vec<_>>();
176        assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
177    }
178
179    async fn count_num_partitions(ctx: &SessionContext, query: &str) -> Result<usize> {
180        let result = ctx
181            .sql(&format!("EXPLAIN {query}"))
182            .await?
183            .collect()
184            .await?;
185
186        let plan = format!("{}", &pretty::pretty_format_batches(&result)?);
187
188        let re = Regex::new(r"file_groups=\{(\d+) group").unwrap();
189
190        if let Some(captures) = re.captures(&plan) {
191            if let Some(match_) = captures.get(1) {
192                let count = match_.as_str().parse::<usize>().unwrap();
193                return Ok(count);
194            }
195        }
196
197        internal_err!("Query contains no Exec: file_groups")
198    }
199
200    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
201    #[tokio::test]
202    async fn it_can_read_ndjson_in_parallel(n_partitions: usize) -> Result<()> {
203        let config = SessionConfig::new()
204            .with_repartition_file_scans(true)
205            .with_repartition_file_min_size(0)
206            .with_target_partitions(n_partitions);
207
208        let ctx = SessionContext::new_with_config(config);
209
210        let table_path = "tests/data/1.json";
211        let options = NdJsonReadOptions::default();
212
213        ctx.register_json("json_parallel", table_path, options)
214            .await?;
215
216        let query = "SELECT sum(a) FROM json_parallel;";
217
218        let result = ctx.sql(query).await?.collect().await?;
219        let actual_partitions = count_num_partitions(&ctx, query).await?;
220
221        insta::allow_duplicates! {assert_snapshot!(batches_to_string(&result),@r###"
222            +----------------------+
223            | sum(json_parallel.a) |
224            +----------------------+
225            | -7                   |
226            +----------------------+
227        "###);}
228
229        assert_eq!(n_partitions, actual_partitions);
230
231        Ok(())
232    }
233
234    #[tokio::test]
235    async fn it_can_read_empty_ndjson() -> Result<()> {
236        let config = SessionConfig::new()
237            .with_repartition_file_scans(true)
238            .with_repartition_file_min_size(0);
239
240        let ctx = SessionContext::new_with_config(config);
241
242        let table_path = "tests/data/empty.json";
243        let options = NdJsonReadOptions::default();
244
245        ctx.register_json("json_parallel_empty", table_path, options)
246            .await?;
247
248        let query = "SELECT * FROM json_parallel_empty WHERE random() > 0.5;";
249
250        let result = ctx.sql(query).await?.collect().await?;
251
252        assert_snapshot!(batches_to_string(&result),@r###"
253            ++
254            ++
255        "###);
256
257        Ok(())
258    }
259
260    #[test]
261    fn test_json_deserializer_finish() -> Result<()> {
262        let schema = Arc::new(Schema::new(vec![
263            Field::new("c1", DataType::Int64, true),
264            Field::new("c2", DataType::Int64, true),
265            Field::new("c3", DataType::Int64, true),
266            Field::new("c4", DataType::Int64, true),
267            Field::new("c5", DataType::Int64, true),
268        ]));
269        let mut deserializer = json_deserializer(1, &schema)?;
270
271        deserializer.digest(r#"{ "c1": 1, "c2": 2, "c3": 3, "c4": 4, "c5": 5 }"#.into());
272        deserializer.digest(r#"{ "c1": 6, "c2": 7, "c3": 8, "c4": 9, "c5": 10 }"#.into());
273        deserializer
274            .digest(r#"{ "c1": 11, "c2": 12, "c3": 13, "c4": 14, "c5": 15 }"#.into());
275        deserializer.finish();
276
277        let mut all_batches = RecordBatch::new_empty(schema.clone());
278        for _ in 0..3 {
279            let output = deserializer.next()?;
280            let DeserializerOutput::RecordBatch(batch) = output else {
281                panic!("Expected RecordBatch, got {output:?}");
282            };
283            all_batches = concat_batches(&schema, &[all_batches, batch])?
284        }
285        assert_eq!(deserializer.next()?, DeserializerOutput::InputExhausted);
286
287        assert_snapshot!(batches_to_string(&[all_batches]),@r###"
288            +----+----+----+----+----+
289            | c1 | c2 | c3 | c4 | c5 |
290            +----+----+----+----+----+
291            | 1  | 2  | 3  | 4  | 5  |
292            | 6  | 7  | 8  | 9  | 10 |
293            | 11 | 12 | 13 | 14 | 15 |
294            +----+----+----+----+----+
295        "###);
296
297        Ok(())
298    }
299
300    #[test]
301    fn test_json_deserializer_no_finish() -> Result<()> {
302        let schema = Arc::new(Schema::new(vec![
303            Field::new("c1", DataType::Int64, true),
304            Field::new("c2", DataType::Int64, true),
305            Field::new("c3", DataType::Int64, true),
306            Field::new("c4", DataType::Int64, true),
307            Field::new("c5", DataType::Int64, true),
308        ]));
309        let mut deserializer = json_deserializer(1, &schema)?;
310
311        deserializer.digest(r#"{ "c1": 1, "c2": 2, "c3": 3, "c4": 4, "c5": 5 }"#.into());
312        deserializer.digest(r#"{ "c1": 6, "c2": 7, "c3": 8, "c4": 9, "c5": 10 }"#.into());
313        deserializer
314            .digest(r#"{ "c1": 11, "c2": 12, "c3": 13, "c4": 14, "c5": 15 }"#.into());
315
316        let mut all_batches = RecordBatch::new_empty(schema.clone());
317        // We get RequiresMoreData after 2 batches because of how json::Decoder works
318        for _ in 0..2 {
319            let output = deserializer.next()?;
320            let DeserializerOutput::RecordBatch(batch) = output else {
321                panic!("Expected RecordBatch, got {output:?}");
322            };
323            all_batches = concat_batches(&schema, &[all_batches, batch])?
324        }
325        assert_eq!(deserializer.next()?, DeserializerOutput::RequiresMoreData);
326
327        insta::assert_snapshot!(fmt_batches(&[all_batches]),@r###"
328            +----+----+----+----+----+
329            | c1 | c2 | c3 | c4 | c5 |
330            +----+----+----+----+----+
331            | 1  | 2  | 3  | 4  | 5  |
332            | 6  | 7  | 8  | 9  | 10 |
333            +----+----+----+----+----+
334        "###);
335
336        Ok(())
337    }
338
339    fn json_deserializer(
340        batch_size: usize,
341        schema: &Arc<Schema>,
342    ) -> Result<impl BatchDeserializer<Bytes>> {
343        let decoder = ReaderBuilder::new(schema.clone())
344            .with_batch_size(batch_size)
345            .build_decoder()?;
346        Ok(DecoderDeserializer::new(JsonDecoder::new(decoder)))
347    }
348
349    fn fmt_batches(batches: &[RecordBatch]) -> String {
350        pretty::pretty_format_batches(batches).unwrap().to_string()
351    }
352}