datafusion_datasource_avro/
source.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::avro_to_arrow::Reader as AvroReader;
24
25use arrow::datatypes::SchemaRef;
26use datafusion_common::error::Result;
27use datafusion_common::Statistics;
28use datafusion_datasource::file::FileSource;
29use datafusion_datasource::file_scan_config::FileScanConfig;
30use datafusion_datasource::file_stream::FileOpener;
31use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
32use datafusion_datasource::TableSchema;
33use datafusion_physical_expr_common::sort_expr::LexOrdering;
34use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
35
36use object_store::ObjectStore;
37
38#[derive(Clone, Default)]
40pub struct AvroSource {
41 schema: Option<SchemaRef>,
42 batch_size: Option<usize>,
43 projection: Option<Vec<String>>,
44 metrics: ExecutionPlanMetricsSet,
45 projected_statistics: Option<Statistics>,
46 schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
47}
48
49impl AvroSource {
50 pub fn new() -> Self {
52 Self::default()
53 }
54
55 fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static, R>> {
56 AvroReader::try_new(
57 reader,
58 Arc::clone(self.schema.as_ref().expect("Schema must set before open")),
59 self.batch_size.expect("Batch size must set before open"),
60 self.projection.clone(),
61 )
62 }
63}
64
65impl FileSource for AvroSource {
66 fn create_file_opener(
67 &self,
68 object_store: Arc<dyn ObjectStore>,
69 _base_config: &FileScanConfig,
70 _partition: usize,
71 ) -> Arc<dyn FileOpener> {
72 Arc::new(private::AvroOpener {
73 config: Arc::new(self.clone()),
74 object_store,
75 })
76 }
77
78 fn as_any(&self) -> &dyn Any {
79 self
80 }
81
82 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
83 let mut conf = self.clone();
84 conf.batch_size = Some(batch_size);
85 Arc::new(conf)
86 }
87
88 fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
89 let mut conf = self.clone();
90 conf.schema = Some(Arc::clone(schema.file_schema()));
92 Arc::new(conf)
93 }
94
95 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
96 let mut conf = self.clone();
97 conf.projected_statistics = Some(statistics);
98 Arc::new(conf)
99 }
100
101 fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
102 let mut conf = self.clone();
103 conf.projection = config.projected_file_column_names();
104 Arc::new(conf)
105 }
106
107 fn metrics(&self) -> &ExecutionPlanMetricsSet {
108 &self.metrics
109 }
110
111 fn statistics(&self) -> Result<Statistics> {
112 let statistics = &self.projected_statistics;
113 Ok(statistics
114 .clone()
115 .expect("projected_statistics must be set"))
116 }
117
118 fn file_type(&self) -> &str {
119 "avro"
120 }
121
122 fn repartitioned(
123 &self,
124 _target_partitions: usize,
125 _repartition_file_min_size: usize,
126 _output_ordering: Option<LexOrdering>,
127 _config: &FileScanConfig,
128 ) -> Result<Option<FileScanConfig>> {
129 Ok(None)
130 }
131
132 fn with_schema_adapter_factory(
133 &self,
134 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
135 ) -> Result<Arc<dyn FileSource>> {
136 Ok(Arc::new(Self {
137 schema_adapter_factory: Some(schema_adapter_factory),
138 ..self.clone()
139 }))
140 }
141
142 fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
143 self.schema_adapter_factory.clone()
144 }
145}
146
147mod private {
148 use super::*;
149
150 use bytes::Buf;
151 use datafusion_datasource::{file_stream::FileOpenFuture, PartitionedFile};
152 use futures::StreamExt;
153 use object_store::{GetResultPayload, ObjectStore};
154
155 pub struct AvroOpener {
156 pub config: Arc<AvroSource>,
157 pub object_store: Arc<dyn ObjectStore>,
158 }
159
160 impl FileOpener for AvroOpener {
161 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
162 let config = Arc::clone(&self.config);
163 let object_store = Arc::clone(&self.object_store);
164 Ok(Box::pin(async move {
165 let r = object_store
166 .get(&partitioned_file.object_meta.location)
167 .await?;
168 match r.payload {
169 GetResultPayload::File(file, _) => {
170 let reader = config.open(file)?;
171 Ok(futures::stream::iter(reader)
172 .map(|r| r.map_err(Into::into))
173 .boxed())
174 }
175 GetResultPayload::Stream(_) => {
176 let bytes = r.bytes().await?;
177 let reader = config.open(bytes.reader())?;
178 Ok(futures::stream::iter(reader)
179 .map(|r| r.map_err(Into::into))
180 .boxed())
181 }
182 }
183 }))
184 }
185 }
186}