datafusion/datasource/physical_plan/
json.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Reexports the [`datafusion_datasource_json::source`] module, containing JSON based [`FileSource`].
19//!
20//! [`FileSource`]: datafusion_datasource::file::FileSource
21
22pub use datafusion_datasource_json::source::*;
23
24#[cfg(test)]
25mod tests {
26
27    use super::*;
28
29    use std::fs;
30    use std::path::Path;
31    use std::sync::Arc;
32
33    use crate::dataframe::DataFrameWriteOptions;
34    use crate::execution::SessionState;
35    use crate::prelude::{CsvReadOptions, NdJsonReadOptions, SessionContext};
36    use crate::test::partitioned_file_groups;
37    use datafusion_common::cast::{as_int32_array, as_int64_array, as_string_array};
38    use datafusion_common::test_util::batches_to_string;
39    use datafusion_common::Result;
40    use datafusion_datasource::file_compression_type::FileCompressionType;
41    use datafusion_datasource::file_format::FileFormat;
42    use datafusion_datasource_json::JsonFormat;
43    use datafusion_execution::config::SessionConfig;
44    use datafusion_execution::object_store::ObjectStoreUrl;
45    use datafusion_physical_plan::ExecutionPlan;
46
47    use arrow::array::Array;
48    use arrow::datatypes::SchemaRef;
49    use arrow::datatypes::{Field, SchemaBuilder};
50    use datafusion_datasource::file_groups::FileGroup;
51    use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
52    use datafusion_datasource::source::DataSourceExec;
53    use insta::assert_snapshot;
54    use object_store::chunked::ChunkedStore;
55    use object_store::local::LocalFileSystem;
56    use object_store::ObjectStore;
57    use rstest::*;
58    use tempfile::TempDir;
59    use url::Url;
60
61    const TEST_DATA_BASE: &str = "tests/data";
62
63    async fn prepare_store(
64        state: &SessionState,
65        file_compression_type: FileCompressionType,
66        work_dir: &Path,
67    ) -> (ObjectStoreUrl, Vec<FileGroup>, SchemaRef) {
68        let store_url = ObjectStoreUrl::local_filesystem();
69        let store = state.runtime_env().object_store(&store_url).unwrap();
70
71        let filename = "1.json";
72        let file_groups = partitioned_file_groups(
73            TEST_DATA_BASE,
74            filename,
75            1,
76            Arc::new(JsonFormat::default()),
77            file_compression_type.to_owned(),
78            work_dir,
79        )
80        .unwrap();
81        let meta = file_groups
82            .first()
83            .unwrap()
84            .files()
85            .first()
86            .unwrap()
87            .clone()
88            .object_meta;
89        let schema = JsonFormat::default()
90            .with_file_compression_type(file_compression_type.to_owned())
91            .infer_schema(state, &store, std::slice::from_ref(&meta))
92            .await
93            .unwrap();
94
95        (store_url, file_groups, schema)
96    }
97
98    async fn test_additional_stores(
99        file_compression_type: FileCompressionType,
100        store: Arc<dyn ObjectStore>,
101    ) -> Result<()> {
102        let ctx = SessionContext::new();
103        let url = Url::parse("file://").unwrap();
104        ctx.register_object_store(&url, store.clone());
105        let filename = "1.json";
106        let tmp_dir = TempDir::new()?;
107        let file_groups = partitioned_file_groups(
108            TEST_DATA_BASE,
109            filename,
110            1,
111            Arc::new(JsonFormat::default()),
112            file_compression_type.to_owned(),
113            tmp_dir.path(),
114        )
115        .unwrap();
116        let path = file_groups
117            .first()
118            .unwrap()
119            .files()
120            .first()
121            .unwrap()
122            .object_meta
123            .location
124            .as_ref();
125
126        let store_url = ObjectStoreUrl::local_filesystem();
127        let url: &Url = store_url.as_ref();
128        let path_buf = Path::new(url.path()).join(path);
129        let path = path_buf.to_str().unwrap();
130
131        let ext = JsonFormat::default()
132            .get_ext_with_compression(&file_compression_type)
133            .unwrap();
134
135        let read_options = NdJsonReadOptions::default()
136            .file_extension(ext.as_str())
137            .file_compression_type(file_compression_type.to_owned());
138        let frame = ctx.read_json(path, read_options).await.unwrap();
139        let results = frame.collect().await.unwrap();
140
141        insta::allow_duplicates! {assert_snapshot!(batches_to_string(&results), @r###"
142            +-----+------------------+---------------+------+
143            | a   | b                | c             | d    |
144            +-----+------------------+---------------+------+
145            | 1   | [2.0, 1.3, -6.1] | [false, true] | 4    |
146            | -10 | [2.0, 1.3, -6.1] | [true, true]  | 4    |
147            | 2   | [2.0, , -6.1]    | [false, ]     | text |
148            |     |                  |               |      |
149            +-----+------------------+---------------+------+
150        "###);}
151
152        Ok(())
153    }
154
155    #[rstest(
156        file_compression_type,
157        case(FileCompressionType::UNCOMPRESSED),
158        case(FileCompressionType::GZIP),
159        case(FileCompressionType::BZIP2),
160        case(FileCompressionType::XZ),
161        case(FileCompressionType::ZSTD)
162    )]
163    #[cfg(feature = "compression")]
164    #[tokio::test]
165    async fn nd_json_exec_file_without_projection(
166        file_compression_type: FileCompressionType,
167    ) -> Result<()> {
168        let session_ctx = SessionContext::new();
169        let state = session_ctx.state();
170        let task_ctx = session_ctx.task_ctx();
171        use arrow::datatypes::DataType;
172
173        use futures::StreamExt;
174
175        let tmp_dir = TempDir::new()?;
176        let (object_store_url, file_groups, file_schema) =
177            prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await;
178
179        let source = Arc::new(JsonSource::new());
180        let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
181            .with_file_groups(file_groups)
182            .with_limit(Some(3))
183            .with_file_compression_type(file_compression_type.to_owned())
184            .build();
185        let exec = DataSourceExec::from_data_source(conf);
186
187        // TODO: this is not where schema inference should be tested
188
189        let inferred_schema = exec.schema();
190        assert_eq!(inferred_schema.fields().len(), 4);
191
192        // a,b,c,d should be inferred
193        inferred_schema.field_with_name("a").unwrap();
194        inferred_schema.field_with_name("b").unwrap();
195        inferred_schema.field_with_name("c").unwrap();
196        inferred_schema.field_with_name("d").unwrap();
197
198        assert_eq!(
199            inferred_schema.field_with_name("a").unwrap().data_type(),
200            &DataType::Int64
201        );
202        assert!(matches!(
203            inferred_schema.field_with_name("b").unwrap().data_type(),
204            DataType::List(_)
205        ));
206        assert_eq!(
207            inferred_schema.field_with_name("d").unwrap().data_type(),
208            &DataType::Utf8
209        );
210
211        let mut it = exec.execute(0, task_ctx)?;
212        let batch = it.next().await.unwrap()?;
213
214        assert_eq!(batch.num_rows(), 3);
215        let values = as_int64_array(batch.column(0))?;
216        assert_eq!(values.value(0), 1);
217        assert_eq!(values.value(1), -10);
218        assert_eq!(values.value(2), 2);
219
220        Ok(())
221    }
222
223    #[rstest(
224        file_compression_type,
225        case(FileCompressionType::UNCOMPRESSED),
226        case(FileCompressionType::GZIP),
227        case(FileCompressionType::BZIP2),
228        case(FileCompressionType::XZ),
229        case(FileCompressionType::ZSTD)
230    )]
231    #[cfg(feature = "compression")]
232    #[tokio::test]
233    async fn nd_json_exec_file_with_missing_column(
234        file_compression_type: FileCompressionType,
235    ) -> Result<()> {
236        use arrow::datatypes::DataType;
237
238        use futures::StreamExt;
239
240        let session_ctx = SessionContext::new();
241        let state = session_ctx.state();
242        let task_ctx = session_ctx.task_ctx();
243
244        let tmp_dir = TempDir::new()?;
245        let (object_store_url, file_groups, actual_schema) =
246            prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await;
247
248        let mut builder = SchemaBuilder::from(actual_schema.fields());
249        builder.push(Field::new("missing_col", DataType::Int32, true));
250
251        let file_schema = Arc::new(builder.finish());
252        let missing_field_idx = file_schema.fields.len() - 1;
253
254        let source = Arc::new(JsonSource::new());
255        let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
256            .with_file_groups(file_groups)
257            .with_limit(Some(3))
258            .with_file_compression_type(file_compression_type.to_owned())
259            .build();
260        let exec = DataSourceExec::from_data_source(conf);
261
262        let mut it = exec.execute(0, task_ctx)?;
263        let batch = it.next().await.unwrap()?;
264
265        assert_eq!(batch.num_rows(), 3);
266        let values = as_int32_array(batch.column(missing_field_idx))?;
267        assert_eq!(values.len(), 3);
268        assert!(values.is_null(0));
269        assert!(values.is_null(1));
270        assert!(values.is_null(2));
271
272        Ok(())
273    }
274
275    #[rstest(
276        file_compression_type,
277        case(FileCompressionType::UNCOMPRESSED),
278        case(FileCompressionType::GZIP),
279        case(FileCompressionType::BZIP2),
280        case(FileCompressionType::XZ),
281        case(FileCompressionType::ZSTD)
282    )]
283    #[cfg(feature = "compression")]
284    #[tokio::test]
285    async fn nd_json_exec_file_projection(
286        file_compression_type: FileCompressionType,
287    ) -> Result<()> {
288        use futures::StreamExt;
289
290        let session_ctx = SessionContext::new();
291        let state = session_ctx.state();
292        let task_ctx = session_ctx.task_ctx();
293        let tmp_dir = TempDir::new()?;
294        let (object_store_url, file_groups, file_schema) =
295            prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await;
296
297        let source = Arc::new(JsonSource::new());
298        let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
299            .with_file_groups(file_groups)
300            .with_projection_indices(Some(vec![0, 2]))
301            .with_file_compression_type(file_compression_type.to_owned())
302            .build();
303        let exec = DataSourceExec::from_data_source(conf);
304        let inferred_schema = exec.schema();
305        assert_eq!(inferred_schema.fields().len(), 2);
306
307        inferred_schema.field_with_name("a").unwrap();
308        inferred_schema.field_with_name("b").unwrap_err();
309        inferred_schema.field_with_name("c").unwrap();
310        inferred_schema.field_with_name("d").unwrap_err();
311
312        let mut it = exec.execute(0, task_ctx)?;
313        let batch = it.next().await.unwrap()?;
314
315        assert_eq!(batch.num_rows(), 4);
316        let values = as_int64_array(batch.column(0))?;
317        assert_eq!(values.value(0), 1);
318        assert_eq!(values.value(1), -10);
319        assert_eq!(values.value(2), 2);
320        Ok(())
321    }
322
323    #[rstest(
324        file_compression_type,
325        case(FileCompressionType::UNCOMPRESSED),
326        case(FileCompressionType::GZIP),
327        case(FileCompressionType::BZIP2),
328        case(FileCompressionType::XZ),
329        case(FileCompressionType::ZSTD)
330    )]
331    #[cfg(feature = "compression")]
332    #[tokio::test]
333    async fn nd_json_exec_file_mixed_order_projection(
334        file_compression_type: FileCompressionType,
335    ) -> Result<()> {
336        use futures::StreamExt;
337
338        let session_ctx = SessionContext::new();
339        let state = session_ctx.state();
340        let task_ctx = session_ctx.task_ctx();
341        let tmp_dir = TempDir::new()?;
342        let (object_store_url, file_groups, file_schema) =
343            prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await;
344
345        let source = Arc::new(JsonSource::new());
346        let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
347            .with_file_groups(file_groups)
348            .with_projection_indices(Some(vec![3, 0, 2]))
349            .with_file_compression_type(file_compression_type.to_owned())
350            .build();
351        let exec = DataSourceExec::from_data_source(conf);
352        let inferred_schema = exec.schema();
353        assert_eq!(inferred_schema.fields().len(), 3);
354
355        inferred_schema.field_with_name("a").unwrap();
356        inferred_schema.field_with_name("b").unwrap_err();
357        inferred_schema.field_with_name("c").unwrap();
358        inferred_schema.field_with_name("d").unwrap();
359
360        let mut it = exec.execute(0, task_ctx)?;
361        let batch = it.next().await.unwrap()?;
362
363        assert_eq!(batch.num_rows(), 4);
364
365        let values = as_string_array(batch.column(0))?;
366        assert_eq!(values.value(0), "4");
367        assert_eq!(values.value(1), "4");
368        assert_eq!(values.value(2), "text");
369
370        let values = as_int64_array(batch.column(1))?;
371        assert_eq!(values.value(0), 1);
372        assert_eq!(values.value(1), -10);
373        assert_eq!(values.value(2), 2);
374        Ok(())
375    }
376
377    #[tokio::test]
378    async fn write_json_results() -> Result<()> {
379        // create partitioned input file and context
380        let ctx = SessionContext::new_with_config(
381            SessionConfig::new().with_target_partitions(8),
382        );
383
384        let path = format!("{TEST_DATA_BASE}/1.json");
385
386        // register json file with the execution context
387        ctx.register_json("test", path.as_str(), NdJsonReadOptions::default())
388            .await?;
389
390        // register a local file system object store for /tmp directory
391        let tmp_dir = TempDir::new()?;
392        let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
393        let local_url = Url::parse("file://local").unwrap();
394        ctx.register_object_store(&local_url, local);
395
396        // execute a simple query and write the results to CSV
397        let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
398        let out_dir_url = "file://local/out/";
399        let df = ctx.sql("SELECT a, b FROM test").await?;
400        df.write_json(out_dir_url, DataFrameWriteOptions::new(), None)
401            .await?;
402
403        // create a new context and verify that the results were saved to a partitioned csv file
404        let ctx = SessionContext::new();
405
406        // get name of first part
407        let paths = fs::read_dir(&out_dir).unwrap();
408        let mut part_0_name: String = "".to_owned();
409        for path in paths {
410            let name = path
411                .unwrap()
412                .path()
413                .file_name()
414                .expect("Should be a file name")
415                .to_str()
416                .expect("Should be a str")
417                .to_owned();
418            if name.ends_with("_0.json") {
419                part_0_name = name;
420                break;
421            }
422        }
423
424        if part_0_name.is_empty() {
425            panic!("Did not find part_0 in json output files!")
426        }
427
428        // register each partition as well as the top level dir
429        let json_read_option = NdJsonReadOptions::default();
430        ctx.register_json(
431            "part0",
432            &format!("{out_dir}/{part_0_name}"),
433            json_read_option.clone(),
434        )
435        .await?;
436        ctx.register_json("allparts", &out_dir, json_read_option)
437            .await?;
438
439        let part0 = ctx.sql("SELECT a, b FROM part0").await?.collect().await?;
440        let allparts = ctx
441            .sql("SELECT a, b FROM allparts")
442            .await?
443            .collect()
444            .await?;
445
446        let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();
447
448        assert_eq!(part0[0].schema(), allparts[0].schema());
449
450        assert_eq!(allparts_count, 4);
451
452        Ok(())
453    }
454
455    #[rstest(
456        file_compression_type,
457        case(FileCompressionType::UNCOMPRESSED),
458        case(FileCompressionType::GZIP),
459        case(FileCompressionType::BZIP2),
460        case(FileCompressionType::XZ),
461        case(FileCompressionType::ZSTD)
462    )]
463    #[cfg(feature = "compression")]
464    #[tokio::test]
465    async fn test_chunked_json(
466        file_compression_type: FileCompressionType,
467        #[values(10, 20, 30, 40)] chunk_size: usize,
468    ) -> Result<()> {
469        test_additional_stores(
470            file_compression_type,
471            Arc::new(ChunkedStore::new(
472                Arc::new(LocalFileSystem::new()),
473                chunk_size,
474            )),
475        )
476        .await?;
477        Ok(())
478    }
479
480    #[tokio::test]
481    async fn write_json_results_error_handling() -> Result<()> {
482        let ctx = SessionContext::new();
483        // register a local file system object store for /tmp directory
484        let tmp_dir = TempDir::new()?;
485        let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
486        let local_url = Url::parse("file://local").unwrap();
487        ctx.register_object_store(&local_url, local);
488        let options = CsvReadOptions::default()
489            .schema_infer_max_records(2)
490            .has_header(true);
491        let df = ctx.read_csv("tests/data/corrupt.csv", options).await?;
492        let out_dir_url = "file://local/out";
493        let e = df
494            .write_json(out_dir_url, DataFrameWriteOptions::new(), None)
495            .await
496            .expect_err("should fail because input file does not match inferred schema");
497        assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value 'd' as type 'Int64' for column 0 at line 4. Row data: '[d,4]'");
498        Ok(())
499    }
500
501    #[tokio::test]
502    async fn ndjson_schema_infer_max_records() -> Result<()> {
503        async fn read_test_data(schema_infer_max_records: usize) -> Result<SchemaRef> {
504            let ctx = SessionContext::new();
505
506            let options = NdJsonReadOptions {
507                schema_infer_max_records,
508                ..Default::default()
509            };
510
511            let batches = ctx
512                .read_json("tests/data/4.json", options)
513                .await?
514                .collect()
515                .await?;
516
517            Ok(batches[0].schema())
518        }
519
520        // Use only the first 2 rows to infer the schema, those have 2 fields.
521        let schema = read_test_data(2).await?;
522        assert_eq!(schema.fields().len(), 2);
523
524        // Use all rows to infer the schema, those have 5 fields.
525        let schema = read_test_data(10).await?;
526        assert_eq!(schema.fields().len(), 5);
527
528        Ok(())
529    }
530
531    #[rstest(
532        file_compression_type,
533        case::uncompressed(FileCompressionType::UNCOMPRESSED),
534        case::gzip(FileCompressionType::GZIP),
535        case::bzip2(FileCompressionType::BZIP2),
536        case::xz(FileCompressionType::XZ),
537        case::zstd(FileCompressionType::ZSTD)
538    )]
539    #[cfg(feature = "compression")]
540    #[tokio::test]
541    async fn test_json_with_repartitioning(
542        file_compression_type: FileCompressionType,
543    ) -> Result<()> {
544        use datafusion_common::assert_batches_sorted_eq;
545        use datafusion_execution::config::SessionConfig;
546
547        let config = SessionConfig::new()
548            .with_repartition_file_scans(true)
549            .with_repartition_file_min_size(0)
550            .with_target_partitions(4);
551        let ctx = SessionContext::new_with_config(config);
552
553        let tmp_dir = TempDir::new()?;
554        let (store_url, file_groups, _) =
555            prepare_store(&ctx.state(), file_compression_type, tmp_dir.path()).await;
556
557        // It's important to have less than `target_partitions` amount of file groups, to
558        // trigger repartitioning.
559        assert_eq!(
560            file_groups.len(),
561            1,
562            "Expected prepared store with single file group"
563        );
564
565        let path = file_groups
566            .first()
567            .unwrap()
568            .files()
569            .first()
570            .unwrap()
571            .object_meta
572            .location
573            .as_ref();
574
575        let url: &Url = store_url.as_ref();
576        let path_buf = Path::new(url.path()).join(path);
577        let path = path_buf.to_str().unwrap();
578        let ext = JsonFormat::default()
579            .get_ext_with_compression(&file_compression_type)
580            .unwrap();
581
582        let read_option = NdJsonReadOptions::default()
583            .file_compression_type(file_compression_type)
584            .file_extension(ext.as_str());
585
586        let df = ctx.read_json(path, read_option).await?;
587        let res = df.collect().await;
588
589        // Output sort order is nondeterministic due to multiple
590        // target partitions. To handle it, assert compares sorted
591        // result.
592        assert_batches_sorted_eq!(
593            &[
594                "+-----+------------------+---------------+------+",
595                "| a   | b                | c             | d    |",
596                "+-----+------------------+---------------+------+",
597                "| 1   | [2.0, 1.3, -6.1] | [false, true] | 4    |",
598                "| -10 | [2.0, 1.3, -6.1] | [true, true]  | 4    |",
599                "| 2   | [2.0, , -6.1]    | [false, ]     | text |",
600                "|     |                  |               |      |",
601                "+-----+------------------+---------------+------+",
602            ],
603            &res?
604        );
605        Ok(())
606    }
607}