datafusion/datasource/physical_plan/
mod.rs1pub mod arrow;
21pub mod csv;
22pub mod json;
23
24#[cfg(feature = "parquet")]
25pub mod parquet;
26
27#[cfg(feature = "avro")]
28pub mod avro;
29
30#[cfg(feature = "avro")]
31pub use avro::AvroSource;
32
33#[cfg(feature = "parquet")]
34pub use datafusion_datasource_parquet::source::ParquetSource;
35#[cfg(feature = "parquet")]
36pub use datafusion_datasource_parquet::{ParquetFileMetrics, ParquetFileReaderFactory};
37
38pub use json::{JsonOpener, JsonSource};
39
40pub use arrow::{ArrowOpener, ArrowSource};
41pub use csv::{CsvOpener, CsvSource};
42pub use datafusion_datasource::file::FileSource;
43pub use datafusion_datasource::file_groups::FileGroup;
44pub use datafusion_datasource::file_groups::FileGroupPartitioner;
45pub use datafusion_datasource::file_scan_config::{
46 wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
47 FileScanConfigBuilder,
48};
49pub use datafusion_datasource::file_sink_config::*;
50
51pub use datafusion_datasource::file_stream::{
52 FileOpenFuture, FileOpener, FileStream, OnError,
53};
54
55#[cfg(test)]
56mod tests {
57 use std::sync::Arc;
58
59 use arrow::array::{
60 cast::AsArray,
61 types::{Float32Type, Float64Type, UInt32Type},
62 BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, RecordBatch,
63 StringArray, UInt64Array,
64 };
65 use arrow::datatypes::{DataType, Field, Schema};
66 use arrow_schema::SchemaRef;
67
68 use crate::datasource::schema_adapter::{
69 DefaultSchemaAdapterFactory, SchemaAdapterFactory,
70 };
71
72 #[test]
73 fn schema_mapping_map_batch() {
74 let table_schema = Arc::new(Schema::new(vec![
75 Field::new("c1", DataType::Utf8, true),
76 Field::new("c2", DataType::UInt32, true),
77 Field::new("c3", DataType::Float64, true),
78 ]));
79
80 let adapter = DefaultSchemaAdapterFactory
81 .create(table_schema.clone(), table_schema.clone());
82
83 let file_schema = Schema::new(vec![
84 Field::new("c1", DataType::Utf8, true),
85 Field::new("c2", DataType::UInt64, true),
86 Field::new("c3", DataType::Float32, true),
87 ]);
88
89 let (mapping, _) = adapter.map_schema(&file_schema).expect("map schema failed");
90
91 let c1 = StringArray::from(vec!["hello", "world"]);
92 let c2 = UInt64Array::from(vec![9_u64, 5_u64]);
93 let c3 = Float32Array::from(vec![2.0_f32, 7.0_f32]);
94 let batch = RecordBatch::try_new(
95 Arc::new(file_schema),
96 vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)],
97 )
98 .unwrap();
99
100 let mapped_batch = mapping.map_batch(batch).unwrap();
101
102 assert_eq!(mapped_batch.schema(), table_schema);
103 assert_eq!(mapped_batch.num_columns(), 3);
104 assert_eq!(mapped_batch.num_rows(), 2);
105
106 let c1 = mapped_batch.column(0).as_string::<i32>();
107 let c2 = mapped_batch.column(1).as_primitive::<UInt32Type>();
108 let c3 = mapped_batch.column(2).as_primitive::<Float64Type>();
109
110 assert_eq!(c1.value(0), "hello");
111 assert_eq!(c1.value(1), "world");
112 assert_eq!(c2.value(0), 9_u32);
113 assert_eq!(c2.value(1), 5_u32);
114 assert_eq!(c3.value(0), 2.0_f64);
115 assert_eq!(c3.value(1), 7.0_f64);
116 }
117
118 #[test]
119 fn schema_adapter_map_schema_with_projection() {
120 let table_schema = Arc::new(Schema::new(vec![
121 Field::new("c0", DataType::Utf8, true),
122 Field::new("c1", DataType::Utf8, true),
123 Field::new("c2", DataType::Float64, true),
124 Field::new("c3", DataType::Int32, true),
125 Field::new("c4", DataType::Float32, true),
126 ]));
127
128 let file_schema = Schema::new(vec![
129 Field::new("id", DataType::Int32, true),
130 Field::new("c1", DataType::Boolean, true),
131 Field::new("c2", DataType::Float32, true),
132 Field::new("c3", DataType::Binary, true),
133 Field::new("c4", DataType::Int64, true),
134 ]);
135
136 let indices = vec![1, 2, 4];
137 let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
138 let adapter = DefaultSchemaAdapterFactory.create(schema, table_schema.clone());
139 let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();
140
141 let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
142 let c1 = BooleanArray::from(vec![Some(true), Some(false), Some(true)]);
143 let c2 = Float32Array::from(vec![Some(2.0_f32), Some(7.0_f32), Some(3.0_f32)]);
144 let c3 = BinaryArray::from_opt_vec(vec![
145 Some(b"hallo"),
146 Some(b"danke"),
147 Some(b"super"),
148 ]);
149 let c4 = Int64Array::from(vec![1, 2, 3]);
150 let batch = RecordBatch::try_new(
151 Arc::new(file_schema),
152 vec![
153 Arc::new(id),
154 Arc::new(c1),
155 Arc::new(c2),
156 Arc::new(c3),
157 Arc::new(c4),
158 ],
159 )
160 .unwrap();
161 let rows_num = batch.num_rows();
162 let projected = batch.project(&projection).unwrap();
163 let mapped_batch = mapping.map_batch(projected).unwrap();
164
165 assert_eq!(
166 mapped_batch.schema(),
167 Arc::new(table_schema.project(&indices).unwrap())
168 );
169 assert_eq!(mapped_batch.num_columns(), indices.len());
170 assert_eq!(mapped_batch.num_rows(), rows_num);
171
172 let c1 = mapped_batch.column(0).as_string::<i32>();
173 let c2 = mapped_batch.column(1).as_primitive::<Float64Type>();
174 let c4 = mapped_batch.column(2).as_primitive::<Float32Type>();
175
176 assert_eq!(c1.value(0), "true");
177 assert_eq!(c1.value(1), "false");
178 assert_eq!(c1.value(2), "true");
179
180 assert_eq!(c2.value(0), 2.0_f64);
181 assert_eq!(c2.value(1), 7.0_f64);
182 assert_eq!(c2.value(2), 3.0_f64);
183
184 assert_eq!(c4.value(0), 1.0_f32);
185 assert_eq!(c4.value(1), 2.0_f32);
186 assert_eq!(c4.value(2), 3.0_f32);
187 }
188}