datafusion/datasource/file_format/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module containing helper methods for the various file formats
19//! See write.rs for write related helper methods
20
21pub mod arrow;
22pub mod csv;
23pub mod json;
24
25#[cfg(feature = "avro")]
26pub mod avro;
27
28#[cfg(feature = "parquet")]
29pub mod parquet;
30
31pub mod options;
32
33pub use datafusion_datasource::file_compression_type;
34pub use datafusion_datasource::file_format::*;
35pub use datafusion_datasource::write;
36
37#[cfg(test)]
38pub(crate) mod test_util {
39    use arrow_schema::SchemaRef;
40    use datafusion_catalog::Session;
41    use datafusion_common::Result;
42    use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
43    use datafusion_datasource::{file_format::FileFormat, PartitionedFile};
44    use datafusion_execution::object_store::ObjectStoreUrl;
45    use std::sync::Arc;
46
47    use crate::test::object_store::local_unpartitioned_file;
48
49    pub async fn scan_format(
50        state: &dyn Session,
51        format: &dyn FileFormat,
52        schema: Option<SchemaRef>,
53        store_root: &str,
54        file_name: &str,
55        projection: Option<Vec<usize>>,
56        limit: Option<usize>,
57    ) -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
58        let store = Arc::new(object_store::local::LocalFileSystem::new()) as _;
59        let meta = local_unpartitioned_file(format!("{store_root}/{file_name}"));
60
61        let file_schema = if let Some(file_schema) = schema {
62            file_schema
63        } else {
64            format
65                .infer_schema(state, &store, std::slice::from_ref(&meta))
66                .await?
67        };
68
69        let statistics = format
70            .infer_stats(state, &store, file_schema.clone(), &meta)
71            .await?;
72
73        let file_groups = vec![vec![PartitionedFile {
74            object_meta: meta,
75            partition_values: vec![],
76            range: None,
77            statistics: None,
78            extensions: None,
79            metadata_size_hint: None,
80        }]
81        .into()];
82
83        let exec = format
84            .create_physical_plan(
85                state,
86                FileScanConfigBuilder::new(
87                    ObjectStoreUrl::local_filesystem(),
88                    file_schema,
89                    format.file_source(),
90                )
91                .with_file_groups(file_groups)
92                .with_statistics(statistics)
93                .with_projection_indices(projection)
94                .with_limit(limit)
95                .build(),
96            )
97            .await?;
98        Ok(exec)
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    #[cfg(feature = "parquet")]
105    #[tokio::test]
106    async fn write_parquet_results_error_handling() -> datafusion_common::Result<()> {
107        use std::sync::Arc;
108
109        use object_store::local::LocalFileSystem;
110        use tempfile::TempDir;
111        use url::Url;
112
113        use crate::{
114            dataframe::DataFrameWriteOptions,
115            prelude::{CsvReadOptions, SessionContext},
116        };
117
118        let ctx = SessionContext::new();
119        // register a local file system object store for /tmp directory
120        let tmp_dir = TempDir::new()?;
121        let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
122        let local_url = Url::parse("file://local").unwrap();
123        ctx.register_object_store(&local_url, local);
124
125        let options = CsvReadOptions::default()
126            .schema_infer_max_records(2)
127            .has_header(true);
128        let df = ctx.read_csv("tests/data/corrupt.csv", options).await?;
129        let out_dir_url = "file://local/out";
130        let e = df
131            .write_parquet(out_dir_url, DataFrameWriteOptions::new(), None)
132            .await
133            .expect_err("should fail because input file does not match inferred schema");
134        assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value 'd' as type 'Int64' for column 0 at line 4. Row data: '[d,4]'");
135        Ok(())
136    }
137}