1use std::any::Any;
21use std::fmt::Formatter;
22use std::fs::{File, OpenOptions};
23use std::io::BufReader;
24use std::path::PathBuf;
25use std::str::FromStr;
26use std::sync::Arc;
27
28use crate::{Session, TableProvider, TableProviderFactory};
29use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter};
30use arrow::datatypes::SchemaRef;
31use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
32use datafusion_common_runtime::SpawnedTask;
33use datafusion_datasource::sink::{DataSink, DataSinkExec};
34use datafusion_execution::{SendableRecordBatchStream, TaskContext};
35use datafusion_expr::dml::InsertOp;
36use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
37use datafusion_physical_expr::create_lex_ordering;
38use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
39use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
40use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
41
42use async_trait::async_trait;
43use futures::StreamExt;
44
45#[derive(Debug, Default)]
47pub struct StreamTableFactory {}
48
49#[async_trait]
50impl TableProviderFactory for StreamTableFactory {
51 async fn create(
52 &self,
53 state: &dyn Session,
54 cmd: &CreateExternalTable,
55 ) -> Result<Arc<dyn TableProvider>> {
56 let schema: SchemaRef = Arc::clone(cmd.schema.inner());
57 let location = cmd.location.clone();
58 let encoding = cmd.file_type.parse()?;
59 let header = if let Ok(opt) = cmd
60 .options
61 .get("format.has_header")
62 .map(|has_header| bool::from_str(has_header.to_lowercase().as_str()))
63 .transpose()
64 {
65 opt.unwrap_or(false)
66 } else {
67 return config_err!(
68 "Valid values for format.has_header option are 'true' or 'false'"
69 );
70 };
71
72 let source = FileStreamProvider::new_file(schema, location.into())
73 .with_encoding(encoding)
74 .with_batch_size(state.config().batch_size())
75 .with_header(header);
76
77 let config = StreamConfig::new(Arc::new(source))
78 .with_order(cmd.order_exprs.clone())
79 .with_constraints(cmd.constraints.clone());
80
81 Ok(Arc::new(StreamTable(Arc::new(config))))
82 }
83}
84
85#[derive(Debug, Clone)]
87pub enum StreamEncoding {
88 Csv,
90 Json,
92}
93
94impl FromStr for StreamEncoding {
95 type Err = DataFusionError;
96
97 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
98 match s.to_ascii_lowercase().as_str() {
99 "csv" => Ok(Self::Csv),
100 "json" => Ok(Self::Json),
101 _ => plan_err!("Unrecognized StreamEncoding {}", s),
102 }
103 }
104}
105
106pub trait StreamProvider: std::fmt::Debug + Send + Sync {
110 fn schema(&self) -> &SchemaRef;
112 fn reader(&self) -> Result<Box<dyn RecordBatchReader>>;
114 fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
116 unimplemented!()
117 }
118 fn stream_write_display(
120 &self,
121 t: DisplayFormatType,
122 f: &mut Formatter,
123 ) -> std::fmt::Result;
124}
125
126#[derive(Debug)]
134pub struct FileStreamProvider {
135 location: PathBuf,
136 encoding: StreamEncoding,
137 pub schema: SchemaRef,
139 header: bool,
140 batch_size: usize,
141}
142
143impl FileStreamProvider {
144 pub fn new_file(schema: SchemaRef, location: PathBuf) -> Self {
152 Self {
153 schema,
154 location,
155 batch_size: 1024,
156 encoding: StreamEncoding::Csv,
157 header: false,
158 }
159 }
160
161 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
163 self.batch_size = batch_size;
164 self
165 }
166
167 pub fn with_header(mut self, header: bool) -> Self {
169 self.header = header;
170 self
171 }
172
173 pub fn with_encoding(mut self, encoding: StreamEncoding) -> Self {
175 self.encoding = encoding;
176 self
177 }
178}
179
180impl StreamProvider for FileStreamProvider {
181 fn schema(&self) -> &SchemaRef {
182 &self.schema
183 }
184
185 fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
186 let file = File::open(&self.location)?;
187 let schema = Arc::clone(&self.schema);
188 match &self.encoding {
189 StreamEncoding::Csv => {
190 let reader = arrow::csv::ReaderBuilder::new(schema)
191 .with_header(self.header)
192 .with_batch_size(self.batch_size)
193 .build(file)?;
194
195 Ok(Box::new(reader))
196 }
197 StreamEncoding::Json => {
198 let reader = arrow::json::ReaderBuilder::new(schema)
199 .with_batch_size(self.batch_size)
200 .build(BufReader::new(file))?;
201
202 Ok(Box::new(reader))
203 }
204 }
205 }
206
207 fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
208 match &self.encoding {
209 StreamEncoding::Csv => {
210 let header = self.header && !self.location.exists();
211 let file = OpenOptions::new()
212 .create(true)
213 .append(true)
214 .open(&self.location)?;
215 let writer = arrow::csv::WriterBuilder::new()
216 .with_header(header)
217 .build(file);
218
219 Ok(Box::new(writer))
220 }
221 StreamEncoding::Json => {
222 let file = OpenOptions::new()
223 .create(true)
224 .append(true)
225 .open(&self.location)?;
226 Ok(Box::new(arrow::json::LineDelimitedWriter::new(file)))
227 }
228 }
229 }
230
231 fn stream_write_display(
232 &self,
233 _t: DisplayFormatType,
234 f: &mut Formatter,
235 ) -> std::fmt::Result {
236 f.debug_struct("StreamWrite")
237 .field("location", &self.location)
238 .field("batch_size", &self.batch_size)
239 .field("encoding", &self.encoding)
240 .field("header", &self.header)
241 .finish_non_exhaustive()
242 }
243}
244
245#[derive(Debug)]
247pub struct StreamConfig {
248 source: Arc<dyn StreamProvider>,
249 order: Vec<Vec<SortExpr>>,
250 constraints: Constraints,
251}
252
253impl StreamConfig {
254 pub fn new(source: Arc<dyn StreamProvider>) -> Self {
256 Self {
257 source,
258 order: vec![],
259 constraints: Constraints::default(),
260 }
261 }
262
263 pub fn with_order(mut self, order: Vec<Vec<SortExpr>>) -> Self {
265 self.order = order;
266 self
267 }
268
269 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
271 self.constraints = constraints;
272 self
273 }
274
275 fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
276 self.source.reader()
277 }
278
279 fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
280 self.source.writer()
281 }
282}
283
284#[derive(Debug)]
295pub struct StreamTable(Arc<StreamConfig>);
296
297impl StreamTable {
298 pub fn new(config: Arc<StreamConfig>) -> Self {
300 Self(config)
301 }
302}
303
304#[async_trait]
305impl TableProvider for StreamTable {
306 fn as_any(&self) -> &dyn Any {
307 self
308 }
309
310 fn schema(&self) -> SchemaRef {
311 Arc::clone(self.0.source.schema())
312 }
313
314 fn constraints(&self) -> Option<&Constraints> {
315 Some(&self.0.constraints)
316 }
317
318 fn table_type(&self) -> TableType {
319 TableType::Base
320 }
321
322 async fn scan(
323 &self,
324 state: &dyn Session,
325 projection: Option<&Vec<usize>>,
326 _filters: &[Expr],
327 limit: Option<usize>,
328 ) -> Result<Arc<dyn ExecutionPlan>> {
329 let projected_schema = match projection {
330 Some(p) => {
331 let projected = Arc::new(self.0.source.schema().project(p)?);
332 create_lex_ordering(&projected, &self.0.order, state.execution_props())?
333 }
334 None => create_lex_ordering(
335 self.0.source.schema(),
336 &self.0.order,
337 state.execution_props(),
338 )?,
339 };
340
341 Ok(Arc::new(StreamingTableExec::try_new(
342 Arc::clone(self.0.source.schema()),
343 vec![Arc::new(StreamRead(Arc::clone(&self.0))) as _],
344 projection,
345 projected_schema,
346 true,
347 limit,
348 )?))
349 }
350
351 async fn insert_into(
352 &self,
353 _state: &dyn Session,
354 input: Arc<dyn ExecutionPlan>,
355 _insert_op: InsertOp,
356 ) -> Result<Arc<dyn ExecutionPlan>> {
357 let schema = self.0.source.schema();
358 let orders =
359 create_lex_ordering(schema, &self.0.order, _state.execution_props())?;
360 let ordering = orders.into_iter().next().map(Into::into);
362
363 Ok(Arc::new(DataSinkExec::new(
364 input,
365 Arc::new(StreamWrite(Arc::clone(&self.0))),
366 ordering,
367 )))
368 }
369}
370
371#[derive(Debug)]
372struct StreamRead(Arc<StreamConfig>);
373
374impl PartitionStream for StreamRead {
375 fn schema(&self) -> &SchemaRef {
376 self.0.source.schema()
377 }
378
379 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
380 let config = Arc::clone(&self.0);
381 let schema = Arc::clone(self.0.source.schema());
382 let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2);
383 let tx = builder.tx();
384 builder.spawn_blocking(move || {
385 let reader = config.reader()?;
386 for b in reader {
387 if tx.blocking_send(b.map_err(Into::into)).is_err() {
388 break;
389 }
390 }
391 Ok(())
392 });
393 builder.build()
394 }
395}
396
397#[derive(Debug)]
398struct StreamWrite(Arc<StreamConfig>);
399
400impl DisplayAs for StreamWrite {
401 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
402 self.0.source.stream_write_display(t, f)
403 }
404}
405
406#[async_trait]
407impl DataSink for StreamWrite {
408 fn as_any(&self) -> &dyn Any {
409 self
410 }
411
412 fn schema(&self) -> &SchemaRef {
413 self.0.source.schema()
414 }
415
416 async fn write_all(
417 &self,
418 mut data: SendableRecordBatchStream,
419 _context: &Arc<TaskContext>,
420 ) -> Result<u64> {
421 let config = Arc::clone(&self.0);
422 let (sender, mut receiver) = tokio::sync::mpsc::channel::<RecordBatch>(2);
423 let write_task = SpawnedTask::spawn_blocking(move || {
425 let mut count = 0_u64;
426 let mut writer = config.writer()?;
427 while let Some(batch) = receiver.blocking_recv() {
428 count += batch.num_rows() as u64;
429 writer.write(&batch)?;
430 }
431 Ok(count)
432 });
433
434 while let Some(b) = data.next().await.transpose()? {
435 if sender.send(b).await.is_err() {
436 break;
437 }
438 }
439 drop(sender);
440 write_task
441 .join_unwind()
442 .await
443 .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))?
444 }
445}