datafusion_datasource_arrow/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use datafusion_datasource::as_file_source;
22use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
23use datafusion_datasource::TableSchema;
24
25use arrow::buffer::Buffer;
26use arrow_ipc::reader::FileDecoder;
27use datafusion_common::error::Result;
28use datafusion_common::{exec_datafusion_err, Statistics};
29use datafusion_datasource::file::FileSource;
30use datafusion_datasource::file_scan_config::FileScanConfig;
31use datafusion_datasource::PartitionedFile;
32use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
33
34use datafusion_datasource::file_stream::FileOpenFuture;
35use datafusion_datasource::file_stream::FileOpener;
36use futures::StreamExt;
37use itertools::Itertools;
38use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
39
40/// Arrow configuration struct that is given to DataSourceExec
41/// Does not hold anything special, since [`FileScanConfig`] is sufficient for arrow
42#[derive(Clone, Default)]
43pub struct ArrowSource {
44    metrics: ExecutionPlanMetricsSet,
45    projected_statistics: Option<Statistics>,
46    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
47}
48
49impl From<ArrowSource> for Arc<dyn FileSource> {
50    fn from(source: ArrowSource) -> Self {
51        as_file_source(source)
52    }
53}
54
55impl FileSource for ArrowSource {
56    fn create_file_opener(
57        &self,
58        object_store: Arc<dyn ObjectStore>,
59        base_config: &FileScanConfig,
60        _partition: usize,
61    ) -> Arc<dyn FileOpener> {
62        Arc::new(ArrowOpener {
63            object_store,
64            projection: base_config.file_column_projection_indices(),
65        })
66    }
67
68    fn as_any(&self) -> &dyn Any {
69        self
70    }
71
72    fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
73        Arc::new(Self { ..self.clone() })
74    }
75
76    fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
77        Arc::new(Self { ..self.clone() })
78    }
79    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
80        let mut conf = self.clone();
81        conf.projected_statistics = Some(statistics);
82        Arc::new(conf)
83    }
84
85    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
86        Arc::new(Self { ..self.clone() })
87    }
88
89    fn metrics(&self) -> &ExecutionPlanMetricsSet {
90        &self.metrics
91    }
92
93    fn statistics(&self) -> Result<Statistics> {
94        let statistics = &self.projected_statistics;
95        Ok(statistics
96            .clone()
97            .expect("projected_statistics must be set"))
98    }
99
100    fn file_type(&self) -> &str {
101        "arrow"
102    }
103
104    fn with_schema_adapter_factory(
105        &self,
106        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
107    ) -> Result<Arc<dyn FileSource>> {
108        Ok(Arc::new(Self {
109            schema_adapter_factory: Some(schema_adapter_factory),
110            ..self.clone()
111        }))
112    }
113
114    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
115        self.schema_adapter_factory.clone()
116    }
117}
118
119/// The struct arrow that implements `[FileOpener]` trait
120pub struct ArrowOpener {
121    pub object_store: Arc<dyn ObjectStore>,
122    pub projection: Option<Vec<usize>>,
123}
124
125impl FileOpener for ArrowOpener {
126    fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
127        let object_store = Arc::clone(&self.object_store);
128        let projection = self.projection.clone();
129        Ok(Box::pin(async move {
130            let range = partitioned_file.range.clone();
131            match range {
132                None => {
133                    let r = object_store
134                        .get(&partitioned_file.object_meta.location)
135                        .await?;
136                    match r.payload {
137                        #[cfg(not(target_arch = "wasm32"))]
138                        GetResultPayload::File(file, _) => {
139                            let arrow_reader = arrow::ipc::reader::FileReader::try_new(
140                                file, projection,
141                            )?;
142                            Ok(futures::stream::iter(arrow_reader)
143                                .map(|r| r.map_err(Into::into))
144                                .boxed())
145                        }
146                        GetResultPayload::Stream(_) => {
147                            let bytes = r.bytes().await?;
148                            let cursor = std::io::Cursor::new(bytes);
149                            let arrow_reader = arrow::ipc::reader::FileReader::try_new(
150                                cursor, projection,
151                            )?;
152                            Ok(futures::stream::iter(arrow_reader)
153                                .map(|r| r.map_err(Into::into))
154                                .boxed())
155                        }
156                    }
157                }
158                Some(range) => {
159                    // range is not none, the file maybe split into multiple parts to scan in parallel
160                    // get footer_len firstly
161                    let get_option = GetOptions {
162                        range: Some(GetRange::Suffix(10)),
163                        ..Default::default()
164                    };
165                    let get_result = object_store
166                        .get_opts(&partitioned_file.object_meta.location, get_option)
167                        .await?;
168                    let footer_len_buf = get_result.bytes().await?;
169                    let footer_len = arrow_ipc::reader::read_footer_length(
170                        footer_len_buf[..].try_into().unwrap(),
171                    )?;
172                    // read footer according to footer_len
173                    let get_option = GetOptions {
174                        range: Some(GetRange::Suffix(10 + (footer_len as u64))),
175                        ..Default::default()
176                    };
177                    let get_result = object_store
178                        .get_opts(&partitioned_file.object_meta.location, get_option)
179                        .await?;
180                    let footer_buf = get_result.bytes().await?;
181                    let footer = arrow_ipc::root_as_footer(
182                        footer_buf[..footer_len].try_into().unwrap(),
183                    )
184                    .map_err(|err| {
185                        exec_datafusion_err!("Unable to get root as footer: {err:?}")
186                    })?;
187                    // build decoder according to footer & projection
188                    let schema =
189                        arrow_ipc::convert::fb_to_schema(footer.schema().unwrap());
190                    let mut decoder = FileDecoder::new(schema.into(), footer.version());
191                    if let Some(projection) = projection {
192                        decoder = decoder.with_projection(projection);
193                    }
194                    let dict_ranges = footer
195                        .dictionaries()
196                        .iter()
197                        .flatten()
198                        .map(|block| {
199                            let block_len =
200                                block.bodyLength() as u64 + block.metaDataLength() as u64;
201                            let block_offset = block.offset() as u64;
202                            block_offset..block_offset + block_len
203                        })
204                        .collect_vec();
205                    let dict_results = object_store
206                        .get_ranges(&partitioned_file.object_meta.location, &dict_ranges)
207                        .await?;
208                    for (dict_block, dict_result) in
209                        footer.dictionaries().iter().flatten().zip(dict_results)
210                    {
211                        decoder
212                            .read_dictionary(dict_block, &Buffer::from(dict_result))?;
213                    }
214
215                    // filter recordbatches according to range
216                    let recordbatches = footer
217                        .recordBatches()
218                        .iter()
219                        .flatten()
220                        .filter(|block| {
221                            let block_offset = block.offset() as u64;
222                            block_offset >= range.start as u64
223                                && block_offset < range.end as u64
224                        })
225                        .copied()
226                        .collect_vec();
227
228                    let recordbatch_ranges = recordbatches
229                        .iter()
230                        .map(|block| {
231                            let block_len =
232                                block.bodyLength() as u64 + block.metaDataLength() as u64;
233                            let block_offset = block.offset() as u64;
234                            block_offset..block_offset + block_len
235                        })
236                        .collect_vec();
237
238                    let recordbatch_results = object_store
239                        .get_ranges(
240                            &partitioned_file.object_meta.location,
241                            &recordbatch_ranges,
242                        )
243                        .await?;
244
245                    Ok(futures::stream::iter(
246                        recordbatches
247                            .into_iter()
248                            .zip(recordbatch_results)
249                            .filter_map(move |(block, data)| {
250                                decoder
251                                    .read_record_batch(&block, &Buffer::from(data))
252                                    .transpose()
253                            }),
254                    )
255                    .map(|r| r.map_err(Into::into))
256                    .boxed())
257                }
258            }
259        }))
260    }
261}