datafusion_datasource_arrow/
source.rs1use std::any::Any;
19use std::sync::Arc;
20
21use datafusion_datasource::as_file_source;
22use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
23use datafusion_datasource::TableSchema;
24
25use arrow::buffer::Buffer;
26use arrow_ipc::reader::FileDecoder;
27use datafusion_common::error::Result;
28use datafusion_common::{exec_datafusion_err, Statistics};
29use datafusion_datasource::file::FileSource;
30use datafusion_datasource::file_scan_config::FileScanConfig;
31use datafusion_datasource::PartitionedFile;
32use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
33
34use datafusion_datasource::file_stream::FileOpenFuture;
35use datafusion_datasource::file_stream::FileOpener;
36use futures::StreamExt;
37use itertools::Itertools;
38use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
39
40#[derive(Clone, Default)]
43pub struct ArrowSource {
44 metrics: ExecutionPlanMetricsSet,
45 projected_statistics: Option<Statistics>,
46 schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
47}
48
49impl From<ArrowSource> for Arc<dyn FileSource> {
50 fn from(source: ArrowSource) -> Self {
51 as_file_source(source)
52 }
53}
54
55impl FileSource for ArrowSource {
56 fn create_file_opener(
57 &self,
58 object_store: Arc<dyn ObjectStore>,
59 base_config: &FileScanConfig,
60 _partition: usize,
61 ) -> Arc<dyn FileOpener> {
62 Arc::new(ArrowOpener {
63 object_store,
64 projection: base_config.file_column_projection_indices(),
65 })
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
73 Arc::new(Self { ..self.clone() })
74 }
75
76 fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
77 Arc::new(Self { ..self.clone() })
78 }
79 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
80 let mut conf = self.clone();
81 conf.projected_statistics = Some(statistics);
82 Arc::new(conf)
83 }
84
85 fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
86 Arc::new(Self { ..self.clone() })
87 }
88
89 fn metrics(&self) -> &ExecutionPlanMetricsSet {
90 &self.metrics
91 }
92
93 fn statistics(&self) -> Result<Statistics> {
94 let statistics = &self.projected_statistics;
95 Ok(statistics
96 .clone()
97 .expect("projected_statistics must be set"))
98 }
99
100 fn file_type(&self) -> &str {
101 "arrow"
102 }
103
104 fn with_schema_adapter_factory(
105 &self,
106 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
107 ) -> Result<Arc<dyn FileSource>> {
108 Ok(Arc::new(Self {
109 schema_adapter_factory: Some(schema_adapter_factory),
110 ..self.clone()
111 }))
112 }
113
114 fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
115 self.schema_adapter_factory.clone()
116 }
117}
118
119pub struct ArrowOpener {
121 pub object_store: Arc<dyn ObjectStore>,
122 pub projection: Option<Vec<usize>>,
123}
124
125impl FileOpener for ArrowOpener {
126 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
127 let object_store = Arc::clone(&self.object_store);
128 let projection = self.projection.clone();
129 Ok(Box::pin(async move {
130 let range = partitioned_file.range.clone();
131 match range {
132 None => {
133 let r = object_store
134 .get(&partitioned_file.object_meta.location)
135 .await?;
136 match r.payload {
137 #[cfg(not(target_arch = "wasm32"))]
138 GetResultPayload::File(file, _) => {
139 let arrow_reader = arrow::ipc::reader::FileReader::try_new(
140 file, projection,
141 )?;
142 Ok(futures::stream::iter(arrow_reader)
143 .map(|r| r.map_err(Into::into))
144 .boxed())
145 }
146 GetResultPayload::Stream(_) => {
147 let bytes = r.bytes().await?;
148 let cursor = std::io::Cursor::new(bytes);
149 let arrow_reader = arrow::ipc::reader::FileReader::try_new(
150 cursor, projection,
151 )?;
152 Ok(futures::stream::iter(arrow_reader)
153 .map(|r| r.map_err(Into::into))
154 .boxed())
155 }
156 }
157 }
158 Some(range) => {
159 let get_option = GetOptions {
162 range: Some(GetRange::Suffix(10)),
163 ..Default::default()
164 };
165 let get_result = object_store
166 .get_opts(&partitioned_file.object_meta.location, get_option)
167 .await?;
168 let footer_len_buf = get_result.bytes().await?;
169 let footer_len = arrow_ipc::reader::read_footer_length(
170 footer_len_buf[..].try_into().unwrap(),
171 )?;
172 let get_option = GetOptions {
174 range: Some(GetRange::Suffix(10 + (footer_len as u64))),
175 ..Default::default()
176 };
177 let get_result = object_store
178 .get_opts(&partitioned_file.object_meta.location, get_option)
179 .await?;
180 let footer_buf = get_result.bytes().await?;
181 let footer = arrow_ipc::root_as_footer(
182 footer_buf[..footer_len].try_into().unwrap(),
183 )
184 .map_err(|err| {
185 exec_datafusion_err!("Unable to get root as footer: {err:?}")
186 })?;
187 let schema =
189 arrow_ipc::convert::fb_to_schema(footer.schema().unwrap());
190 let mut decoder = FileDecoder::new(schema.into(), footer.version());
191 if let Some(projection) = projection {
192 decoder = decoder.with_projection(projection);
193 }
194 let dict_ranges = footer
195 .dictionaries()
196 .iter()
197 .flatten()
198 .map(|block| {
199 let block_len =
200 block.bodyLength() as u64 + block.metaDataLength() as u64;
201 let block_offset = block.offset() as u64;
202 block_offset..block_offset + block_len
203 })
204 .collect_vec();
205 let dict_results = object_store
206 .get_ranges(&partitioned_file.object_meta.location, &dict_ranges)
207 .await?;
208 for (dict_block, dict_result) in
209 footer.dictionaries().iter().flatten().zip(dict_results)
210 {
211 decoder
212 .read_dictionary(dict_block, &Buffer::from(dict_result))?;
213 }
214
215 let recordbatches = footer
217 .recordBatches()
218 .iter()
219 .flatten()
220 .filter(|block| {
221 let block_offset = block.offset() as u64;
222 block_offset >= range.start as u64
223 && block_offset < range.end as u64
224 })
225 .copied()
226 .collect_vec();
227
228 let recordbatch_ranges = recordbatches
229 .iter()
230 .map(|block| {
231 let block_len =
232 block.bodyLength() as u64 + block.metaDataLength() as u64;
233 let block_offset = block.offset() as u64;
234 block_offset..block_offset + block_len
235 })
236 .collect_vec();
237
238 let recordbatch_results = object_store
239 .get_ranges(
240 &partitioned_file.object_meta.location,
241 &recordbatch_ranges,
242 )
243 .await?;
244
245 Ok(futures::stream::iter(
246 recordbatches
247 .into_iter()
248 .zip(recordbatch_results)
249 .filter_map(move |(block, data)| {
250 decoder
251 .read_record_batch(&block, &Buffer::from(data))
252 .transpose()
253 }),
254 )
255 .map(|r| r.map_err(Into::into))
256 .boxed())
257 }
258 }
259 }))
260 }
261}