datafusion_datasource_avro/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading line-delimited Avro files
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::avro_to_arrow::Reader as AvroReader;
24
25use arrow::datatypes::SchemaRef;
26use datafusion_common::error::Result;
27use datafusion_common::Statistics;
28use datafusion_datasource::file::FileSource;
29use datafusion_datasource::file_scan_config::FileScanConfig;
30use datafusion_datasource::file_stream::FileOpener;
31use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
32use datafusion_datasource::TableSchema;
33use datafusion_physical_expr_common::sort_expr::LexOrdering;
34use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
35
36use object_store::ObjectStore;
37
38/// AvroSource holds the extra configuration that is necessary for opening avro files
39#[derive(Clone, Default)]
40pub struct AvroSource {
41    schema: Option<SchemaRef>,
42    batch_size: Option<usize>,
43    projection: Option<Vec<String>>,
44    metrics: ExecutionPlanMetricsSet,
45    projected_statistics: Option<Statistics>,
46    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
47}
48
49impl AvroSource {
50    /// Initialize an AvroSource with default values
51    pub fn new() -> Self {
52        Self::default()
53    }
54
55    fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static, R>> {
56        AvroReader::try_new(
57            reader,
58            Arc::clone(self.schema.as_ref().expect("Schema must set before open")),
59            self.batch_size.expect("Batch size must set before open"),
60            self.projection.clone(),
61        )
62    }
63}
64
65impl FileSource for AvroSource {
66    fn create_file_opener(
67        &self,
68        object_store: Arc<dyn ObjectStore>,
69        _base_config: &FileScanConfig,
70        _partition: usize,
71    ) -> Arc<dyn FileOpener> {
72        Arc::new(private::AvroOpener {
73            config: Arc::new(self.clone()),
74            object_store,
75        })
76    }
77
78    fn as_any(&self) -> &dyn Any {
79        self
80    }
81
82    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
83        let mut conf = self.clone();
84        conf.batch_size = Some(batch_size);
85        Arc::new(conf)
86    }
87
88    fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
89        let mut conf = self.clone();
90        // TableSchema may have partition columns, but AvroSource does not use partition columns or values atm
91        conf.schema = Some(Arc::clone(schema.file_schema()));
92        Arc::new(conf)
93    }
94
95    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
96        let mut conf = self.clone();
97        conf.projected_statistics = Some(statistics);
98        Arc::new(conf)
99    }
100
101    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
102        let mut conf = self.clone();
103        conf.projection = config.projected_file_column_names();
104        Arc::new(conf)
105    }
106
107    fn metrics(&self) -> &ExecutionPlanMetricsSet {
108        &self.metrics
109    }
110
111    fn statistics(&self) -> Result<Statistics> {
112        let statistics = &self.projected_statistics;
113        Ok(statistics
114            .clone()
115            .expect("projected_statistics must be set"))
116    }
117
118    fn file_type(&self) -> &str {
119        "avro"
120    }
121
122    fn repartitioned(
123        &self,
124        _target_partitions: usize,
125        _repartition_file_min_size: usize,
126        _output_ordering: Option<LexOrdering>,
127        _config: &FileScanConfig,
128    ) -> Result<Option<FileScanConfig>> {
129        Ok(None)
130    }
131
132    fn with_schema_adapter_factory(
133        &self,
134        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
135    ) -> Result<Arc<dyn FileSource>> {
136        Ok(Arc::new(Self {
137            schema_adapter_factory: Some(schema_adapter_factory),
138            ..self.clone()
139        }))
140    }
141
142    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
143        self.schema_adapter_factory.clone()
144    }
145}
146
147mod private {
148    use super::*;
149
150    use bytes::Buf;
151    use datafusion_datasource::{file_stream::FileOpenFuture, PartitionedFile};
152    use futures::StreamExt;
153    use object_store::{GetResultPayload, ObjectStore};
154
155    pub struct AvroOpener {
156        pub config: Arc<AvroSource>,
157        pub object_store: Arc<dyn ObjectStore>,
158    }
159
160    impl FileOpener for AvroOpener {
161        fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
162            let config = Arc::clone(&self.config);
163            let object_store = Arc::clone(&self.object_store);
164            Ok(Box::pin(async move {
165                let r = object_store
166                    .get(&partitioned_file.object_meta.location)
167                    .await?;
168                match r.payload {
169                    GetResultPayload::File(file, _) => {
170                        let reader = config.open(file)?;
171                        Ok(futures::stream::iter(reader)
172                            .map(|r| r.map_err(Into::into))
173                            .boxed())
174                    }
175                    GetResultPayload::Stream(_) => {
176                        let bytes = r.bytes().await?;
177                        let reader = config.open(bytes.reader())?;
178                        Ok(futures::stream::iter(reader)
179                            .map(|r| r.map_err(Into::into))
180                            .boxed())
181                    }
182                }
183            }))
184        }
185    }
186}