datafusion_datasource_csv/
source.rs1use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
21use std::any::Any;
22use std::fmt;
23use std::io::{Read, Seek, SeekFrom};
24use std::sync::Arc;
25use std::task::Poll;
26
27use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
28use datafusion_datasource::file_compression_type::FileCompressionType;
29use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
30use datafusion_datasource::{
31 as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile,
32 RangeCalculation, TableSchema,
33};
34
35use arrow::csv;
36use arrow::datatypes::SchemaRef;
37use datafusion_common::{DataFusionError, Result, Statistics};
38use datafusion_common_runtime::JoinSet;
39use datafusion_datasource::file::FileSource;
40use datafusion_datasource::file_scan_config::FileScanConfig;
41use datafusion_execution::TaskContext;
42use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
43use datafusion_physical_plan::{
44 DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
45};
46
47use crate::file_format::CsvDecoder;
48use futures::{StreamExt, TryStreamExt};
49use object_store::buffered::BufWriter;
50use object_store::{GetOptions, GetResultPayload, ObjectStore};
51use tokio::io::AsyncWriteExt;
52
53#[derive(Debug, Clone, Default)]
83pub struct CsvSource {
84 batch_size: Option<usize>,
85 file_schema: Option<SchemaRef>,
86 file_projection: Option<Vec<usize>>,
87 pub(crate) has_header: bool,
88 delimiter: u8,
89 quote: u8,
90 terminator: Option<u8>,
91 escape: Option<u8>,
92 comment: Option<u8>,
93 metrics: ExecutionPlanMetricsSet,
94 projected_statistics: Option<Statistics>,
95 schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
96 truncate_rows: bool,
97}
98
99impl CsvSource {
100 pub fn new(has_header: bool, delimiter: u8, quote: u8) -> Self {
102 Self {
103 has_header,
104 delimiter,
105 quote,
106 ..Self::default()
107 }
108 }
109
110 pub fn has_header(&self) -> bool {
112 self.has_header
113 }
114
115 pub fn truncate_rows(&self) -> bool {
117 self.truncate_rows
118 }
119 pub fn delimiter(&self) -> u8 {
121 self.delimiter
122 }
123
124 pub fn quote(&self) -> u8 {
126 self.quote
127 }
128
129 pub fn terminator(&self) -> Option<u8> {
131 self.terminator
132 }
133
134 pub fn comment(&self) -> Option<u8> {
136 self.comment
137 }
138
139 pub fn escape(&self) -> Option<u8> {
141 self.escape
142 }
143
144 pub fn with_escape(&self, escape: Option<u8>) -> Self {
146 let mut conf = self.clone();
147 conf.escape = escape;
148 conf
149 }
150
151 pub fn with_terminator(&self, terminator: Option<u8>) -> Self {
153 let mut conf = self.clone();
154 conf.terminator = terminator;
155 conf
156 }
157
158 pub fn with_comment(&self, comment: Option<u8>) -> Self {
160 let mut conf = self.clone();
161 conf.comment = comment;
162 conf
163 }
164
165 pub fn with_truncate_rows(&self, truncate_rows: bool) -> Self {
167 let mut conf = self.clone();
168 conf.truncate_rows = truncate_rows;
169 conf
170 }
171}
172
173impl CsvSource {
174 fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> {
175 Ok(self.builder().build(reader)?)
176 }
177
178 fn builder(&self) -> csv::ReaderBuilder {
179 let mut builder = csv::ReaderBuilder::new(Arc::clone(
180 self.file_schema
181 .as_ref()
182 .expect("Schema must be set before initializing builder"),
183 ))
184 .with_delimiter(self.delimiter)
185 .with_batch_size(
186 self.batch_size
187 .expect("Batch size must be set before initializing builder"),
188 )
189 .with_header(self.has_header)
190 .with_quote(self.quote)
191 .with_truncated_rows(self.truncate_rows);
192 if let Some(terminator) = self.terminator {
193 builder = builder.with_terminator(terminator);
194 }
195 if let Some(proj) = &self.file_projection {
196 builder = builder.with_projection(proj.clone());
197 }
198 if let Some(escape) = self.escape {
199 builder = builder.with_escape(escape)
200 }
201 if let Some(comment) = self.comment {
202 builder = builder.with_comment(comment);
203 }
204
205 builder
206 }
207}
208
209pub struct CsvOpener {
211 config: Arc<CsvSource>,
212 file_compression_type: FileCompressionType,
213 object_store: Arc<dyn ObjectStore>,
214}
215
216impl CsvOpener {
217 pub fn new(
219 config: Arc<CsvSource>,
220 file_compression_type: FileCompressionType,
221 object_store: Arc<dyn ObjectStore>,
222 ) -> Self {
223 Self {
224 config,
225 file_compression_type,
226 object_store,
227 }
228 }
229}
230
231impl From<CsvSource> for Arc<dyn FileSource> {
232 fn from(source: CsvSource) -> Self {
233 as_file_source(source)
234 }
235}
236
237impl FileSource for CsvSource {
238 fn create_file_opener(
239 &self,
240 object_store: Arc<dyn ObjectStore>,
241 base_config: &FileScanConfig,
242 _partition: usize,
243 ) -> Arc<dyn FileOpener> {
244 Arc::new(CsvOpener {
245 config: Arc::new(self.clone()),
246 file_compression_type: base_config.file_compression_type,
247 object_store,
248 })
249 }
250
251 fn as_any(&self) -> &dyn Any {
252 self
253 }
254
255 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
256 let mut conf = self.clone();
257 conf.batch_size = Some(batch_size);
258 Arc::new(conf)
259 }
260
261 fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
262 let mut conf = self.clone();
263 conf.file_schema = Some(Arc::clone(schema.file_schema()));
264 Arc::new(conf)
265 }
266
267 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
268 let mut conf = self.clone();
269 conf.projected_statistics = Some(statistics);
270 Arc::new(conf)
271 }
272
273 fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
274 let mut conf = self.clone();
275 conf.file_projection = config.file_column_projection_indices();
276 Arc::new(conf)
277 }
278
279 fn metrics(&self) -> &ExecutionPlanMetricsSet {
280 &self.metrics
281 }
282 fn statistics(&self) -> Result<Statistics> {
283 let statistics = &self.projected_statistics;
284 Ok(statistics
285 .clone()
286 .expect("projected_statistics must be set"))
287 }
288 fn file_type(&self) -> &str {
289 "csv"
290 }
291 fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
292 match t {
293 DisplayFormatType::Default | DisplayFormatType::Verbose => {
294 write!(f, ", has_header={}", self.has_header)
295 }
296 DisplayFormatType::TreeRender => Ok(()),
297 }
298 }
299
300 fn with_schema_adapter_factory(
301 &self,
302 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
303 ) -> Result<Arc<dyn FileSource>> {
304 Ok(Arc::new(Self {
305 schema_adapter_factory: Some(schema_adapter_factory),
306 ..self.clone()
307 }))
308 }
309
310 fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
311 self.schema_adapter_factory.clone()
312 }
313}
314
315impl FileOpener for CsvOpener {
316 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
340 let mut csv_has_header = self.config.has_header;
344 if let Some(FileRange { start, .. }) = partitioned_file.range {
345 if start != 0 {
346 csv_has_header = false;
347 }
348 }
349
350 let config = CsvSource {
351 has_header: csv_has_header,
352 truncate_rows: self.config.truncate_rows,
353 ..(*self.config).clone()
354 };
355
356 let file_compression_type = self.file_compression_type.to_owned();
357
358 if partitioned_file.range.is_some() {
359 assert!(
360 !file_compression_type.is_compressed(),
361 "Reading compressed .csv in parallel is not supported"
362 );
363 }
364
365 let store = Arc::clone(&self.object_store);
366 let terminator = self.config.terminator;
367
368 Ok(Box::pin(async move {
369 let calculated_range =
372 calculate_range(&partitioned_file, &store, terminator).await?;
373
374 let range = match calculated_range {
375 RangeCalculation::Range(None) => None,
376 RangeCalculation::Range(Some(range)) => Some(range.into()),
377 RangeCalculation::TerminateEarly => {
378 return Ok(
379 futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
380 )
381 }
382 };
383
384 let options = GetOptions {
385 range,
386 ..Default::default()
387 };
388
389 let result = store
390 .get_opts(&partitioned_file.object_meta.location, options)
391 .await?;
392
393 match result.payload {
394 #[cfg(not(target_arch = "wasm32"))]
395 GetResultPayload::File(mut file, _) => {
396 let is_whole_file_scanned = partitioned_file.range.is_none();
397 let decoder = if is_whole_file_scanned {
398 file_compression_type.convert_read(file)?
400 } else {
401 file.seek(SeekFrom::Start(result.range.start as _))?;
402 file_compression_type.convert_read(
403 file.take((result.range.end - result.range.start) as u64),
404 )?
405 };
406
407 Ok(futures::stream::iter(config.open(decoder)?)
408 .map(|r| r.map_err(Into::into))
409 .boxed())
410 }
411 GetResultPayload::Stream(s) => {
412 let decoder = config.builder().build_decoder();
413 let s = s.map_err(DataFusionError::from);
414 let input = file_compression_type.convert_stream(s.boxed())?.fuse();
415
416 let stream = deserialize_stream(
417 input,
418 DecoderDeserializer::new(CsvDecoder::new(decoder)),
419 );
420 Ok(stream.map_err(Into::into).boxed())
421 }
422 }
423 }))
424 }
425}
426
427pub async fn plan_to_csv(
428 task_ctx: Arc<TaskContext>,
429 plan: Arc<dyn ExecutionPlan>,
430 path: impl AsRef<str>,
431) -> Result<()> {
432 let path = path.as_ref();
433 let parsed = ListingTableUrl::parse(path)?;
434 let object_store_url = parsed.object_store();
435 let store = task_ctx.runtime_env().object_store(&object_store_url)?;
436 let writer_buffer_size = task_ctx
437 .session_config()
438 .options()
439 .execution
440 .objectstore_writer_buffer_size;
441 let mut join_set = JoinSet::new();
442 for i in 0..plan.output_partitioning().partition_count() {
443 let storeref = Arc::clone(&store);
444 let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
445 let filename = format!("{}/part-{i}.csv", parsed.prefix());
446 let file = object_store::path::Path::parse(filename)?;
447
448 let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
449 join_set.spawn(async move {
450 let mut buf_writer =
451 BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
452 let mut buffer = Vec::with_capacity(1024);
453 let mut write_headers = true;
455 while let Some(batch) = stream.next().await.transpose()? {
456 let mut writer = csv::WriterBuilder::new()
457 .with_header(write_headers)
458 .build(buffer);
459 writer.write(&batch)?;
460 buffer = writer.into_inner();
461 buf_writer.write_all(&buffer).await?;
462 buffer.clear();
463 write_headers = false;
465 }
466 buf_writer.shutdown().await.map_err(DataFusionError::from)
467 });
468 }
469
470 while let Some(result) = join_set.join_next().await {
471 match result {
472 Ok(res) => res?, Err(e) => {
474 if e.is_panic() {
475 std::panic::resume_unwind(e.into_panic());
476 } else {
477 unreachable!();
478 }
479 }
480 }
481 }
482
483 Ok(())
484}