datafusion/datasource/file_format/
mod.rs1pub mod arrow;
22pub mod csv;
23pub mod json;
24
25#[cfg(feature = "avro")]
26pub mod avro;
27
28#[cfg(feature = "parquet")]
29pub mod parquet;
30
31pub mod options;
32
33pub use datafusion_datasource::file_compression_type;
34pub use datafusion_datasource::file_format::*;
35pub use datafusion_datasource::write;
36
37#[cfg(test)]
38pub(crate) mod test_util {
39 use arrow_schema::SchemaRef;
40 use datafusion_catalog::Session;
41 use datafusion_common::Result;
42 use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
43 use datafusion_datasource::{file_format::FileFormat, PartitionedFile};
44 use datafusion_execution::object_store::ObjectStoreUrl;
45 use std::sync::Arc;
46
47 use crate::test::object_store::local_unpartitioned_file;
48
49 pub async fn scan_format(
50 state: &dyn Session,
51 format: &dyn FileFormat,
52 schema: Option<SchemaRef>,
53 store_root: &str,
54 file_name: &str,
55 projection: Option<Vec<usize>>,
56 limit: Option<usize>,
57 ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
58 let store = Arc::new(object_store::local::LocalFileSystem::new()) as _;
59 let meta = local_unpartitioned_file(format!("{store_root}/{file_name}"));
60
61 let file_schema = if let Some(file_schema) = schema {
62 file_schema
63 } else {
64 format
65 .infer_schema(state, &store, std::slice::from_ref(&meta))
66 .await?
67 };
68
69 let statistics = format
70 .infer_stats(state, &store, file_schema.clone(), &meta)
71 .await?;
72
73 let file_groups = vec![vec![PartitionedFile {
74 object_meta: meta,
75 partition_values: vec![],
76 range: None,
77 statistics: None,
78 extensions: None,
79 metadata_size_hint: None,
80 }]
81 .into()];
82
83 let exec = format
84 .create_physical_plan(
85 state,
86 FileScanConfigBuilder::new(
87 ObjectStoreUrl::local_filesystem(),
88 file_schema,
89 format.file_source(),
90 )
91 .with_file_groups(file_groups)
92 .with_statistics(statistics)
93 .with_projection_indices(projection)
94 .with_limit(limit)
95 .build(),
96 )
97 .await?;
98 Ok(exec)
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 #[cfg(feature = "parquet")]
105 #[tokio::test]
106 async fn write_parquet_results_error_handling() -> datafusion_common::Result<()> {
107 use std::sync::Arc;
108
109 use object_store::local::LocalFileSystem;
110 use tempfile::TempDir;
111 use url::Url;
112
113 use crate::{
114 dataframe::DataFrameWriteOptions,
115 prelude::{CsvReadOptions, SessionContext},
116 };
117
118 let ctx = SessionContext::new();
119 let tmp_dir = TempDir::new()?;
121 let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
122 let local_url = Url::parse("file://local").unwrap();
123 ctx.register_object_store(&local_url, local);
124
125 let options = CsvReadOptions::default()
126 .schema_infer_max_records(2)
127 .has_header(true);
128 let df = ctx.read_csv("tests/data/corrupt.csv", options).await?;
129 let out_dir_url = "file://local/out";
130 let e = df
131 .write_parquet(out_dir_url, DataFrameWriteOptions::new(), None)
132 .await
133 .expect_err("should fail because input file does not match inferred schema");
134 assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value 'd' as type 'Int64' for column 0 at line 4. Row data: '[d,4]'");
135 Ok(())
136 }
137}