datafusion/datasource/physical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plans that read file formats
19
20pub mod arrow;
21pub mod csv;
22pub mod json;
23
24#[cfg(feature = "parquet")]
25pub mod parquet;
26
27#[cfg(feature = "avro")]
28pub mod avro;
29
30#[cfg(feature = "avro")]
31pub use avro::AvroSource;
32
33#[cfg(feature = "parquet")]
34pub use datafusion_datasource_parquet::source::ParquetSource;
35#[cfg(feature = "parquet")]
36pub use datafusion_datasource_parquet::{ParquetFileMetrics, ParquetFileReaderFactory};
37
38pub use json::{JsonOpener, JsonSource};
39
40pub use arrow::{ArrowOpener, ArrowSource};
41pub use csv::{CsvOpener, CsvSource};
42pub use datafusion_datasource::file::FileSource;
43pub use datafusion_datasource::file_groups::FileGroup;
44pub use datafusion_datasource::file_groups::FileGroupPartitioner;
45pub use datafusion_datasource::file_scan_config::{
46    wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
47    FileScanConfigBuilder,
48};
49pub use datafusion_datasource::file_sink_config::*;
50
51pub use datafusion_datasource::file_stream::{
52    FileOpenFuture, FileOpener, FileStream, OnError,
53};
54
55#[cfg(test)]
56mod tests {
57    use std::sync::Arc;
58
59    use arrow::array::{
60        cast::AsArray,
61        types::{Float32Type, Float64Type, UInt32Type},
62        BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, RecordBatch,
63        StringArray, UInt64Array,
64    };
65    use arrow::datatypes::{DataType, Field, Schema};
66    use arrow_schema::SchemaRef;
67
68    use crate::datasource::schema_adapter::{
69        DefaultSchemaAdapterFactory, SchemaAdapterFactory,
70    };
71
72    #[test]
73    fn schema_mapping_map_batch() {
74        let table_schema = Arc::new(Schema::new(vec![
75            Field::new("c1", DataType::Utf8, true),
76            Field::new("c2", DataType::UInt32, true),
77            Field::new("c3", DataType::Float64, true),
78        ]));
79
80        let adapter = DefaultSchemaAdapterFactory
81            .create(table_schema.clone(), table_schema.clone());
82
83        let file_schema = Schema::new(vec![
84            Field::new("c1", DataType::Utf8, true),
85            Field::new("c2", DataType::UInt64, true),
86            Field::new("c3", DataType::Float32, true),
87        ]);
88
89        let (mapping, _) = adapter.map_schema(&file_schema).expect("map schema failed");
90
91        let c1 = StringArray::from(vec!["hello", "world"]);
92        let c2 = UInt64Array::from(vec![9_u64, 5_u64]);
93        let c3 = Float32Array::from(vec![2.0_f32, 7.0_f32]);
94        let batch = RecordBatch::try_new(
95            Arc::new(file_schema),
96            vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)],
97        )
98        .unwrap();
99
100        let mapped_batch = mapping.map_batch(batch).unwrap();
101
102        assert_eq!(mapped_batch.schema(), table_schema);
103        assert_eq!(mapped_batch.num_columns(), 3);
104        assert_eq!(mapped_batch.num_rows(), 2);
105
106        let c1 = mapped_batch.column(0).as_string::<i32>();
107        let c2 = mapped_batch.column(1).as_primitive::<UInt32Type>();
108        let c3 = mapped_batch.column(2).as_primitive::<Float64Type>();
109
110        assert_eq!(c1.value(0), "hello");
111        assert_eq!(c1.value(1), "world");
112        assert_eq!(c2.value(0), 9_u32);
113        assert_eq!(c2.value(1), 5_u32);
114        assert_eq!(c3.value(0), 2.0_f64);
115        assert_eq!(c3.value(1), 7.0_f64);
116    }
117
118    #[test]
119    fn schema_adapter_map_schema_with_projection() {
120        let table_schema = Arc::new(Schema::new(vec![
121            Field::new("c0", DataType::Utf8, true),
122            Field::new("c1", DataType::Utf8, true),
123            Field::new("c2", DataType::Float64, true),
124            Field::new("c3", DataType::Int32, true),
125            Field::new("c4", DataType::Float32, true),
126        ]));
127
128        let file_schema = Schema::new(vec![
129            Field::new("id", DataType::Int32, true),
130            Field::new("c1", DataType::Boolean, true),
131            Field::new("c2", DataType::Float32, true),
132            Field::new("c3", DataType::Binary, true),
133            Field::new("c4", DataType::Int64, true),
134        ]);
135
136        let indices = vec![1, 2, 4];
137        let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
138        let adapter = DefaultSchemaAdapterFactory.create(schema, table_schema.clone());
139        let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();
140
141        let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
142        let c1 = BooleanArray::from(vec![Some(true), Some(false), Some(true)]);
143        let c2 = Float32Array::from(vec![Some(2.0_f32), Some(7.0_f32), Some(3.0_f32)]);
144        let c3 = BinaryArray::from_opt_vec(vec![
145            Some(b"hallo"),
146            Some(b"danke"),
147            Some(b"super"),
148        ]);
149        let c4 = Int64Array::from(vec![1, 2, 3]);
150        let batch = RecordBatch::try_new(
151            Arc::new(file_schema),
152            vec![
153                Arc::new(id),
154                Arc::new(c1),
155                Arc::new(c2),
156                Arc::new(c3),
157                Arc::new(c4),
158            ],
159        )
160        .unwrap();
161        let rows_num = batch.num_rows();
162        let projected = batch.project(&projection).unwrap();
163        let mapped_batch = mapping.map_batch(projected).unwrap();
164
165        assert_eq!(
166            mapped_batch.schema(),
167            Arc::new(table_schema.project(&indices).unwrap())
168        );
169        assert_eq!(mapped_batch.num_columns(), indices.len());
170        assert_eq!(mapped_batch.num_rows(), rows_num);
171
172        let c1 = mapped_batch.column(0).as_string::<i32>();
173        let c2 = mapped_batch.column(1).as_primitive::<Float64Type>();
174        let c4 = mapped_batch.column(2).as_primitive::<Float32Type>();
175
176        assert_eq!(c1.value(0), "true");
177        assert_eq!(c1.value(1), "false");
178        assert_eq!(c1.value(2), "true");
179
180        assert_eq!(c2.value(0), 2.0_f64);
181        assert_eq!(c2.value(1), 7.0_f64);
182        assert_eq!(c2.value(2), 3.0_f64);
183
184        assert_eq!(c4.value(0), 1.0_f32);
185        assert_eq!(c4.value(1), 2.0_f32);
186        assert_eq!(c4.value(2), 3.0_f32);
187    }
188}