1pub use datafusion_datasource_json::source::*;
23
24#[cfg(test)]
25mod tests {
26
27 use super::*;
28
29 use std::fs;
30 use std::path::Path;
31 use std::sync::Arc;
32
33 use crate::dataframe::DataFrameWriteOptions;
34 use crate::execution::SessionState;
35 use crate::prelude::{CsvReadOptions, NdJsonReadOptions, SessionContext};
36 use crate::test::partitioned_file_groups;
37 use datafusion_common::cast::{as_int32_array, as_int64_array, as_string_array};
38 use datafusion_common::test_util::batches_to_string;
39 use datafusion_common::Result;
40 use datafusion_datasource::file_compression_type::FileCompressionType;
41 use datafusion_datasource::file_format::FileFormat;
42 use datafusion_datasource_json::JsonFormat;
43 use datafusion_execution::config::SessionConfig;
44 use datafusion_execution::object_store::ObjectStoreUrl;
45 use datafusion_physical_plan::ExecutionPlan;
46
47 use arrow::array::Array;
48 use arrow::datatypes::SchemaRef;
49 use arrow::datatypes::{Field, SchemaBuilder};
50 use datafusion_datasource::file_groups::FileGroup;
51 use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
52 use datafusion_datasource::source::DataSourceExec;
53 use insta::assert_snapshot;
54 use object_store::chunked::ChunkedStore;
55 use object_store::local::LocalFileSystem;
56 use object_store::ObjectStore;
57 use rstest::*;
58 use tempfile::TempDir;
59 use url::Url;
60
61 const TEST_DATA_BASE: &str = "tests/data";
62
63 async fn prepare_store(
64 state: &SessionState,
65 file_compression_type: FileCompressionType,
66 work_dir: &Path,
67 ) -> (ObjectStoreUrl, Vec<FileGroup>, SchemaRef) {
68 let store_url = ObjectStoreUrl::local_filesystem();
69 let store = state.runtime_env().object_store(&store_url).unwrap();
70
71 let filename = "1.json";
72 let file_groups = partitioned_file_groups(
73 TEST_DATA_BASE,
74 filename,
75 1,
76 Arc::new(JsonFormat::default()),
77 file_compression_type.to_owned(),
78 work_dir,
79 )
80 .unwrap();
81 let meta = file_groups
82 .first()
83 .unwrap()
84 .files()
85 .first()
86 .unwrap()
87 .clone()
88 .object_meta;
89 let schema = JsonFormat::default()
90 .with_file_compression_type(file_compression_type.to_owned())
91 .infer_schema(state, &store, std::slice::from_ref(&meta))
92 .await
93 .unwrap();
94
95 (store_url, file_groups, schema)
96 }
97
98 async fn test_additional_stores(
99 file_compression_type: FileCompressionType,
100 store: Arc<dyn ObjectStore>,
101 ) -> Result<()> {
102 let ctx = SessionContext::new();
103 let url = Url::parse("file://").unwrap();
104 ctx.register_object_store(&url, store.clone());
105 let filename = "1.json";
106 let tmp_dir = TempDir::new()?;
107 let file_groups = partitioned_file_groups(
108 TEST_DATA_BASE,
109 filename,
110 1,
111 Arc::new(JsonFormat::default()),
112 file_compression_type.to_owned(),
113 tmp_dir.path(),
114 )
115 .unwrap();
116 let path = file_groups
117 .first()
118 .unwrap()
119 .files()
120 .first()
121 .unwrap()
122 .object_meta
123 .location
124 .as_ref();
125
126 let store_url = ObjectStoreUrl::local_filesystem();
127 let url: &Url = store_url.as_ref();
128 let path_buf = Path::new(url.path()).join(path);
129 let path = path_buf.to_str().unwrap();
130
131 let ext = JsonFormat::default()
132 .get_ext_with_compression(&file_compression_type)
133 .unwrap();
134
135 let read_options = NdJsonReadOptions::default()
136 .file_extension(ext.as_str())
137 .file_compression_type(file_compression_type.to_owned());
138 let frame = ctx.read_json(path, read_options).await.unwrap();
139 let results = frame.collect().await.unwrap();
140
141 insta::allow_duplicates! {assert_snapshot!(batches_to_string(&results), @r###"
142 +-----+------------------+---------------+------+
143 | a | b | c | d |
144 +-----+------------------+---------------+------+
145 | 1 | [2.0, 1.3, -6.1] | [false, true] | 4 |
146 | -10 | [2.0, 1.3, -6.1] | [true, true] | 4 |
147 | 2 | [2.0, , -6.1] | [false, ] | text |
148 | | | | |
149 +-----+------------------+---------------+------+
150 "###);}
151
152 Ok(())
153 }
154
155 #[rstest(
156 file_compression_type,
157 case(FileCompressionType::UNCOMPRESSED),
158 case(FileCompressionType::GZIP),
159 case(FileCompressionType::BZIP2),
160 case(FileCompressionType::XZ),
161 case(FileCompressionType::ZSTD)
162 )]
163 #[cfg(feature = "compression")]
164 #[tokio::test]
165 async fn nd_json_exec_file_without_projection(
166 file_compression_type: FileCompressionType,
167 ) -> Result<()> {
168 let session_ctx = SessionContext::new();
169 let state = session_ctx.state();
170 let task_ctx = session_ctx.task_ctx();
171 use arrow::datatypes::DataType;
172
173 use futures::StreamExt;
174
175 let tmp_dir = TempDir::new()?;
176 let (object_store_url, file_groups, file_schema) =
177 prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await;
178
179 let source = Arc::new(JsonSource::new());
180 let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
181 .with_file_groups(file_groups)
182 .with_limit(Some(3))
183 .with_file_compression_type(file_compression_type.to_owned())
184 .build();
185 let exec = DataSourceExec::from_data_source(conf);
186
187 let inferred_schema = exec.schema();
190 assert_eq!(inferred_schema.fields().len(), 4);
191
192 inferred_schema.field_with_name("a").unwrap();
194 inferred_schema.field_with_name("b").unwrap();
195 inferred_schema.field_with_name("c").unwrap();
196 inferred_schema.field_with_name("d").unwrap();
197
198 assert_eq!(
199 inferred_schema.field_with_name("a").unwrap().data_type(),
200 &DataType::Int64
201 );
202 assert!(matches!(
203 inferred_schema.field_with_name("b").unwrap().data_type(),
204 DataType::List(_)
205 ));
206 assert_eq!(
207 inferred_schema.field_with_name("d").unwrap().data_type(),
208 &DataType::Utf8
209 );
210
211 let mut it = exec.execute(0, task_ctx)?;
212 let batch = it.next().await.unwrap()?;
213
214 assert_eq!(batch.num_rows(), 3);
215 let values = as_int64_array(batch.column(0))?;
216 assert_eq!(values.value(0), 1);
217 assert_eq!(values.value(1), -10);
218 assert_eq!(values.value(2), 2);
219
220 Ok(())
221 }
222
223 #[rstest(
224 file_compression_type,
225 case(FileCompressionType::UNCOMPRESSED),
226 case(FileCompressionType::GZIP),
227 case(FileCompressionType::BZIP2),
228 case(FileCompressionType::XZ),
229 case(FileCompressionType::ZSTD)
230 )]
231 #[cfg(feature = "compression")]
232 #[tokio::test]
233 async fn nd_json_exec_file_with_missing_column(
234 file_compression_type: FileCompressionType,
235 ) -> Result<()> {
236 use arrow::datatypes::DataType;
237
238 use futures::StreamExt;
239
240 let session_ctx = SessionContext::new();
241 let state = session_ctx.state();
242 let task_ctx = session_ctx.task_ctx();
243
244 let tmp_dir = TempDir::new()?;
245 let (object_store_url, file_groups, actual_schema) =
246 prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await;
247
248 let mut builder = SchemaBuilder::from(actual_schema.fields());
249 builder.push(Field::new("missing_col", DataType::Int32, true));
250
251 let file_schema = Arc::new(builder.finish());
252 let missing_field_idx = file_schema.fields.len() - 1;
253
254 let source = Arc::new(JsonSource::new());
255 let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
256 .with_file_groups(file_groups)
257 .with_limit(Some(3))
258 .with_file_compression_type(file_compression_type.to_owned())
259 .build();
260 let exec = DataSourceExec::from_data_source(conf);
261
262 let mut it = exec.execute(0, task_ctx)?;
263 let batch = it.next().await.unwrap()?;
264
265 assert_eq!(batch.num_rows(), 3);
266 let values = as_int32_array(batch.column(missing_field_idx))?;
267 assert_eq!(values.len(), 3);
268 assert!(values.is_null(0));
269 assert!(values.is_null(1));
270 assert!(values.is_null(2));
271
272 Ok(())
273 }
274
275 #[rstest(
276 file_compression_type,
277 case(FileCompressionType::UNCOMPRESSED),
278 case(FileCompressionType::GZIP),
279 case(FileCompressionType::BZIP2),
280 case(FileCompressionType::XZ),
281 case(FileCompressionType::ZSTD)
282 )]
283 #[cfg(feature = "compression")]
284 #[tokio::test]
285 async fn nd_json_exec_file_projection(
286 file_compression_type: FileCompressionType,
287 ) -> Result<()> {
288 use futures::StreamExt;
289
290 let session_ctx = SessionContext::new();
291 let state = session_ctx.state();
292 let task_ctx = session_ctx.task_ctx();
293 let tmp_dir = TempDir::new()?;
294 let (object_store_url, file_groups, file_schema) =
295 prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await;
296
297 let source = Arc::new(JsonSource::new());
298 let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
299 .with_file_groups(file_groups)
300 .with_projection_indices(Some(vec![0, 2]))
301 .with_file_compression_type(file_compression_type.to_owned())
302 .build();
303 let exec = DataSourceExec::from_data_source(conf);
304 let inferred_schema = exec.schema();
305 assert_eq!(inferred_schema.fields().len(), 2);
306
307 inferred_schema.field_with_name("a").unwrap();
308 inferred_schema.field_with_name("b").unwrap_err();
309 inferred_schema.field_with_name("c").unwrap();
310 inferred_schema.field_with_name("d").unwrap_err();
311
312 let mut it = exec.execute(0, task_ctx)?;
313 let batch = it.next().await.unwrap()?;
314
315 assert_eq!(batch.num_rows(), 4);
316 let values = as_int64_array(batch.column(0))?;
317 assert_eq!(values.value(0), 1);
318 assert_eq!(values.value(1), -10);
319 assert_eq!(values.value(2), 2);
320 Ok(())
321 }
322
323 #[rstest(
324 file_compression_type,
325 case(FileCompressionType::UNCOMPRESSED),
326 case(FileCompressionType::GZIP),
327 case(FileCompressionType::BZIP2),
328 case(FileCompressionType::XZ),
329 case(FileCompressionType::ZSTD)
330 )]
331 #[cfg(feature = "compression")]
332 #[tokio::test]
333 async fn nd_json_exec_file_mixed_order_projection(
334 file_compression_type: FileCompressionType,
335 ) -> Result<()> {
336 use futures::StreamExt;
337
338 let session_ctx = SessionContext::new();
339 let state = session_ctx.state();
340 let task_ctx = session_ctx.task_ctx();
341 let tmp_dir = TempDir::new()?;
342 let (object_store_url, file_groups, file_schema) =
343 prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await;
344
345 let source = Arc::new(JsonSource::new());
346 let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
347 .with_file_groups(file_groups)
348 .with_projection_indices(Some(vec![3, 0, 2]))
349 .with_file_compression_type(file_compression_type.to_owned())
350 .build();
351 let exec = DataSourceExec::from_data_source(conf);
352 let inferred_schema = exec.schema();
353 assert_eq!(inferred_schema.fields().len(), 3);
354
355 inferred_schema.field_with_name("a").unwrap();
356 inferred_schema.field_with_name("b").unwrap_err();
357 inferred_schema.field_with_name("c").unwrap();
358 inferred_schema.field_with_name("d").unwrap();
359
360 let mut it = exec.execute(0, task_ctx)?;
361 let batch = it.next().await.unwrap()?;
362
363 assert_eq!(batch.num_rows(), 4);
364
365 let values = as_string_array(batch.column(0))?;
366 assert_eq!(values.value(0), "4");
367 assert_eq!(values.value(1), "4");
368 assert_eq!(values.value(2), "text");
369
370 let values = as_int64_array(batch.column(1))?;
371 assert_eq!(values.value(0), 1);
372 assert_eq!(values.value(1), -10);
373 assert_eq!(values.value(2), 2);
374 Ok(())
375 }
376
377 #[tokio::test]
378 async fn write_json_results() -> Result<()> {
379 let ctx = SessionContext::new_with_config(
381 SessionConfig::new().with_target_partitions(8),
382 );
383
384 let path = format!("{TEST_DATA_BASE}/1.json");
385
386 ctx.register_json("test", path.as_str(), NdJsonReadOptions::default())
388 .await?;
389
390 let tmp_dir = TempDir::new()?;
392 let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
393 let local_url = Url::parse("file://local").unwrap();
394 ctx.register_object_store(&local_url, local);
395
396 let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
398 let out_dir_url = "file://local/out/";
399 let df = ctx.sql("SELECT a, b FROM test").await?;
400 df.write_json(out_dir_url, DataFrameWriteOptions::new(), None)
401 .await?;
402
403 let ctx = SessionContext::new();
405
406 let paths = fs::read_dir(&out_dir).unwrap();
408 let mut part_0_name: String = "".to_owned();
409 for path in paths {
410 let name = path
411 .unwrap()
412 .path()
413 .file_name()
414 .expect("Should be a file name")
415 .to_str()
416 .expect("Should be a str")
417 .to_owned();
418 if name.ends_with("_0.json") {
419 part_0_name = name;
420 break;
421 }
422 }
423
424 if part_0_name.is_empty() {
425 panic!("Did not find part_0 in json output files!")
426 }
427
428 let json_read_option = NdJsonReadOptions::default();
430 ctx.register_json(
431 "part0",
432 &format!("{out_dir}/{part_0_name}"),
433 json_read_option.clone(),
434 )
435 .await?;
436 ctx.register_json("allparts", &out_dir, json_read_option)
437 .await?;
438
439 let part0 = ctx.sql("SELECT a, b FROM part0").await?.collect().await?;
440 let allparts = ctx
441 .sql("SELECT a, b FROM allparts")
442 .await?
443 .collect()
444 .await?;
445
446 let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();
447
448 assert_eq!(part0[0].schema(), allparts[0].schema());
449
450 assert_eq!(allparts_count, 4);
451
452 Ok(())
453 }
454
455 #[rstest(
456 file_compression_type,
457 case(FileCompressionType::UNCOMPRESSED),
458 case(FileCompressionType::GZIP),
459 case(FileCompressionType::BZIP2),
460 case(FileCompressionType::XZ),
461 case(FileCompressionType::ZSTD)
462 )]
463 #[cfg(feature = "compression")]
464 #[tokio::test]
465 async fn test_chunked_json(
466 file_compression_type: FileCompressionType,
467 #[values(10, 20, 30, 40)] chunk_size: usize,
468 ) -> Result<()> {
469 test_additional_stores(
470 file_compression_type,
471 Arc::new(ChunkedStore::new(
472 Arc::new(LocalFileSystem::new()),
473 chunk_size,
474 )),
475 )
476 .await?;
477 Ok(())
478 }
479
480 #[tokio::test]
481 async fn write_json_results_error_handling() -> Result<()> {
482 let ctx = SessionContext::new();
483 let tmp_dir = TempDir::new()?;
485 let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
486 let local_url = Url::parse("file://local").unwrap();
487 ctx.register_object_store(&local_url, local);
488 let options = CsvReadOptions::default()
489 .schema_infer_max_records(2)
490 .has_header(true);
491 let df = ctx.read_csv("tests/data/corrupt.csv", options).await?;
492 let out_dir_url = "file://local/out";
493 let e = df
494 .write_json(out_dir_url, DataFrameWriteOptions::new(), None)
495 .await
496 .expect_err("should fail because input file does not match inferred schema");
497 assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value 'd' as type 'Int64' for column 0 at line 4. Row data: '[d,4]'");
498 Ok(())
499 }
500
501 #[tokio::test]
502 async fn ndjson_schema_infer_max_records() -> Result<()> {
503 async fn read_test_data(schema_infer_max_records: usize) -> Result<SchemaRef> {
504 let ctx = SessionContext::new();
505
506 let options = NdJsonReadOptions {
507 schema_infer_max_records,
508 ..Default::default()
509 };
510
511 let batches = ctx
512 .read_json("tests/data/4.json", options)
513 .await?
514 .collect()
515 .await?;
516
517 Ok(batches[0].schema())
518 }
519
520 let schema = read_test_data(2).await?;
522 assert_eq!(schema.fields().len(), 2);
523
524 let schema = read_test_data(10).await?;
526 assert_eq!(schema.fields().len(), 5);
527
528 Ok(())
529 }
530
531 #[rstest(
532 file_compression_type,
533 case::uncompressed(FileCompressionType::UNCOMPRESSED),
534 case::gzip(FileCompressionType::GZIP),
535 case::bzip2(FileCompressionType::BZIP2),
536 case::xz(FileCompressionType::XZ),
537 case::zstd(FileCompressionType::ZSTD)
538 )]
539 #[cfg(feature = "compression")]
540 #[tokio::test]
541 async fn test_json_with_repartitioning(
542 file_compression_type: FileCompressionType,
543 ) -> Result<()> {
544 use datafusion_common::assert_batches_sorted_eq;
545 use datafusion_execution::config::SessionConfig;
546
547 let config = SessionConfig::new()
548 .with_repartition_file_scans(true)
549 .with_repartition_file_min_size(0)
550 .with_target_partitions(4);
551 let ctx = SessionContext::new_with_config(config);
552
553 let tmp_dir = TempDir::new()?;
554 let (store_url, file_groups, _) =
555 prepare_store(&ctx.state(), file_compression_type, tmp_dir.path()).await;
556
557 assert_eq!(
560 file_groups.len(),
561 1,
562 "Expected prepared store with single file group"
563 );
564
565 let path = file_groups
566 .first()
567 .unwrap()
568 .files()
569 .first()
570 .unwrap()
571 .object_meta
572 .location
573 .as_ref();
574
575 let url: &Url = store_url.as_ref();
576 let path_buf = Path::new(url.path()).join(path);
577 let path = path_buf.to_str().unwrap();
578 let ext = JsonFormat::default()
579 .get_ext_with_compression(&file_compression_type)
580 .unwrap();
581
582 let read_option = NdJsonReadOptions::default()
583 .file_compression_type(file_compression_type)
584 .file_extension(ext.as_str());
585
586 let df = ctx.read_json(path, read_option).await?;
587 let res = df.collect().await;
588
589 assert_batches_sorted_eq!(
593 &[
594 "+-----+------------------+---------------+------+",
595 "| a | b | c | d |",
596 "+-----+------------------+---------------+------+",
597 "| 1 | [2.0, 1.3, -6.1] | [false, true] | 4 |",
598 "| -10 | [2.0, 1.3, -6.1] | [true, true] | 4 |",
599 "| 2 | [2.0, , -6.1] | [false, ] | text |",
600 "| | | | |",
601 "+-----+------------------+---------------+------+",
602 ],
603 &res?
604 );
605 Ok(())
606 }
607}