datafusion_catalog/
stream.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! TableProvider for stream sources, such as FIFO files
19
20use std::any::Any;
21use std::fmt::Formatter;
22use std::fs::{File, OpenOptions};
23use std::io::BufReader;
24use std::path::PathBuf;
25use std::str::FromStr;
26use std::sync::Arc;
27
28use crate::{Session, TableProvider, TableProviderFactory};
29use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter};
30use arrow::datatypes::SchemaRef;
31use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
32use datafusion_common_runtime::SpawnedTask;
33use datafusion_datasource::sink::{DataSink, DataSinkExec};
34use datafusion_execution::{SendableRecordBatchStream, TaskContext};
35use datafusion_expr::dml::InsertOp;
36use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
37use datafusion_physical_expr::create_lex_ordering;
38use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
39use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
40use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
41
42use async_trait::async_trait;
43use futures::StreamExt;
44
45/// A [`TableProviderFactory`] for [`StreamTable`]
46#[derive(Debug, Default)]
47pub struct StreamTableFactory {}
48
49#[async_trait]
50impl TableProviderFactory for StreamTableFactory {
51    async fn create(
52        &self,
53        state: &dyn Session,
54        cmd: &CreateExternalTable,
55    ) -> Result<Arc<dyn TableProvider>> {
56        let schema: SchemaRef = Arc::clone(cmd.schema.inner());
57        let location = cmd.location.clone();
58        let encoding = cmd.file_type.parse()?;
59        let header = if let Ok(opt) = cmd
60            .options
61            .get("format.has_header")
62            .map(|has_header| bool::from_str(has_header.to_lowercase().as_str()))
63            .transpose()
64        {
65            opt.unwrap_or(false)
66        } else {
67            return config_err!(
68                "Valid values for format.has_header option are 'true' or 'false'"
69            );
70        };
71
72        let source = FileStreamProvider::new_file(schema, location.into())
73            .with_encoding(encoding)
74            .with_batch_size(state.config().batch_size())
75            .with_header(header);
76
77        let config = StreamConfig::new(Arc::new(source))
78            .with_order(cmd.order_exprs.clone())
79            .with_constraints(cmd.constraints.clone());
80
81        Ok(Arc::new(StreamTable(Arc::new(config))))
82    }
83}
84
85/// The data encoding for [`StreamTable`]
86#[derive(Debug, Clone)]
87pub enum StreamEncoding {
88    /// CSV records
89    Csv,
90    /// Newline-delimited JSON records
91    Json,
92}
93
94impl FromStr for StreamEncoding {
95    type Err = DataFusionError;
96
97    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
98        match s.to_ascii_lowercase().as_str() {
99            "csv" => Ok(Self::Csv),
100            "json" => Ok(Self::Json),
101            _ => plan_err!("Unrecognized StreamEncoding {}", s),
102        }
103    }
104}
105
106/// The StreamProvider trait is used as a generic interface for reading and writing from streaming
107/// data sources (such as FIFO, Websocket, Kafka, etc.).  Implementations of the provider are
108/// responsible for providing a `RecordBatchReader` and optionally a `RecordBatchWriter`.
109pub trait StreamProvider: std::fmt::Debug + Send + Sync {
110    /// Get a reference to the schema for this stream
111    fn schema(&self) -> &SchemaRef;
112    /// Provide `RecordBatchReader`
113    fn reader(&self) -> Result<Box<dyn RecordBatchReader>>;
114    /// Provide `RecordBatchWriter`
115    fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
116        unimplemented!()
117    }
118    /// Display implementation when using as a DataSink
119    fn stream_write_display(
120        &self,
121        t: DisplayFormatType,
122        f: &mut Formatter,
123    ) -> std::fmt::Result;
124}
125
126/// Stream data from the file at `location`
127///
128/// * Data will be read sequentially from the provided `location`
129/// * New data will be appended to the end of the file
130///
131/// The encoding can be configured with [`Self::with_encoding`] and
132/// defaults to [`StreamEncoding::Csv`]
133#[derive(Debug)]
134pub struct FileStreamProvider {
135    location: PathBuf,
136    encoding: StreamEncoding,
137    /// Get a reference to the schema for this file stream
138    pub schema: SchemaRef,
139    header: bool,
140    batch_size: usize,
141}
142
143impl FileStreamProvider {
144    /// Stream data from the file at `location`
145    ///
146    /// * Data will be read sequentially from the provided `location`
147    /// * New data will be appended to the end of the file
148    ///
149    /// The encoding can be configured with [`Self::with_encoding`] and
150    /// defaults to [`StreamEncoding::Csv`]
151    pub fn new_file(schema: SchemaRef, location: PathBuf) -> Self {
152        Self {
153            schema,
154            location,
155            batch_size: 1024,
156            encoding: StreamEncoding::Csv,
157            header: false,
158        }
159    }
160
161    /// Set the batch size (the number of rows to load at one time)
162    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
163        self.batch_size = batch_size;
164        self
165    }
166
167    /// Specify whether the file has a header (only applicable for [`StreamEncoding::Csv`])
168    pub fn with_header(mut self, header: bool) -> Self {
169        self.header = header;
170        self
171    }
172
173    /// Specify an encoding for the stream
174    pub fn with_encoding(mut self, encoding: StreamEncoding) -> Self {
175        self.encoding = encoding;
176        self
177    }
178}
179
180impl StreamProvider for FileStreamProvider {
181    fn schema(&self) -> &SchemaRef {
182        &self.schema
183    }
184
185    fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
186        let file = File::open(&self.location)?;
187        let schema = Arc::clone(&self.schema);
188        match &self.encoding {
189            StreamEncoding::Csv => {
190                let reader = arrow::csv::ReaderBuilder::new(schema)
191                    .with_header(self.header)
192                    .with_batch_size(self.batch_size)
193                    .build(file)?;
194
195                Ok(Box::new(reader))
196            }
197            StreamEncoding::Json => {
198                let reader = arrow::json::ReaderBuilder::new(schema)
199                    .with_batch_size(self.batch_size)
200                    .build(BufReader::new(file))?;
201
202                Ok(Box::new(reader))
203            }
204        }
205    }
206
207    fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
208        match &self.encoding {
209            StreamEncoding::Csv => {
210                let header = self.header && !self.location.exists();
211                let file = OpenOptions::new()
212                    .create(true)
213                    .append(true)
214                    .open(&self.location)?;
215                let writer = arrow::csv::WriterBuilder::new()
216                    .with_header(header)
217                    .build(file);
218
219                Ok(Box::new(writer))
220            }
221            StreamEncoding::Json => {
222                let file = OpenOptions::new()
223                    .create(true)
224                    .append(true)
225                    .open(&self.location)?;
226                Ok(Box::new(arrow::json::LineDelimitedWriter::new(file)))
227            }
228        }
229    }
230
231    fn stream_write_display(
232        &self,
233        _t: DisplayFormatType,
234        f: &mut Formatter,
235    ) -> std::fmt::Result {
236        f.debug_struct("StreamWrite")
237            .field("location", &self.location)
238            .field("batch_size", &self.batch_size)
239            .field("encoding", &self.encoding)
240            .field("header", &self.header)
241            .finish_non_exhaustive()
242    }
243}
244
245/// The configuration for a [`StreamTable`]
246#[derive(Debug)]
247pub struct StreamConfig {
248    source: Arc<dyn StreamProvider>,
249    order: Vec<Vec<SortExpr>>,
250    constraints: Constraints,
251}
252
253impl StreamConfig {
254    /// Create a new `StreamConfig` from a `StreamProvider`
255    pub fn new(source: Arc<dyn StreamProvider>) -> Self {
256        Self {
257            source,
258            order: vec![],
259            constraints: Constraints::default(),
260        }
261    }
262
263    /// Specify a sort order for the stream
264    pub fn with_order(mut self, order: Vec<Vec<SortExpr>>) -> Self {
265        self.order = order;
266        self
267    }
268
269    /// Assign constraints
270    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
271        self.constraints = constraints;
272        self
273    }
274
275    fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
276        self.source.reader()
277    }
278
279    fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
280        self.source.writer()
281    }
282}
283
284/// A [`TableProvider`] for an unbounded stream source
285///
286/// Currently only reading from / appending to a single file in-place is supported, but
287/// other stream sources and sinks may be added in future.
288///
289/// Applications looking to read/write datasets comprising multiple files, e.g. [Hadoop]-style
290/// data stored in object storage, should instead consider [`ListingTable`].
291///
292/// [Hadoop]: https://hadoop.apache.org/
293/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
294#[derive(Debug)]
295pub struct StreamTable(Arc<StreamConfig>);
296
297impl StreamTable {
298    /// Create a new [`StreamTable`] for the given [`StreamConfig`]
299    pub fn new(config: Arc<StreamConfig>) -> Self {
300        Self(config)
301    }
302}
303
304#[async_trait]
305impl TableProvider for StreamTable {
306    fn as_any(&self) -> &dyn Any {
307        self
308    }
309
310    fn schema(&self) -> SchemaRef {
311        Arc::clone(self.0.source.schema())
312    }
313
314    fn constraints(&self) -> Option<&Constraints> {
315        Some(&self.0.constraints)
316    }
317
318    fn table_type(&self) -> TableType {
319        TableType::Base
320    }
321
322    async fn scan(
323        &self,
324        state: &dyn Session,
325        projection: Option<&Vec<usize>>,
326        _filters: &[Expr],
327        limit: Option<usize>,
328    ) -> Result<Arc<dyn ExecutionPlan>> {
329        let projected_schema = match projection {
330            Some(p) => {
331                let projected = Arc::new(self.0.source.schema().project(p)?);
332                create_lex_ordering(&projected, &self.0.order, state.execution_props())?
333            }
334            None => create_lex_ordering(
335                self.0.source.schema(),
336                &self.0.order,
337                state.execution_props(),
338            )?,
339        };
340
341        Ok(Arc::new(StreamingTableExec::try_new(
342            Arc::clone(self.0.source.schema()),
343            vec![Arc::new(StreamRead(Arc::clone(&self.0))) as _],
344            projection,
345            projected_schema,
346            true,
347            limit,
348        )?))
349    }
350
351    async fn insert_into(
352        &self,
353        _state: &dyn Session,
354        input: Arc<dyn ExecutionPlan>,
355        _insert_op: InsertOp,
356    ) -> Result<Arc<dyn ExecutionPlan>> {
357        let schema = self.0.source.schema();
358        let orders =
359            create_lex_ordering(schema, &self.0.order, _state.execution_props())?;
360        // It is sufficient to pass only one of the equivalent orderings:
361        let ordering = orders.into_iter().next().map(Into::into);
362
363        Ok(Arc::new(DataSinkExec::new(
364            input,
365            Arc::new(StreamWrite(Arc::clone(&self.0))),
366            ordering,
367        )))
368    }
369}
370
371#[derive(Debug)]
372struct StreamRead(Arc<StreamConfig>);
373
374impl PartitionStream for StreamRead {
375    fn schema(&self) -> &SchemaRef {
376        self.0.source.schema()
377    }
378
379    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
380        let config = Arc::clone(&self.0);
381        let schema = Arc::clone(self.0.source.schema());
382        let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2);
383        let tx = builder.tx();
384        builder.spawn_blocking(move || {
385            let reader = config.reader()?;
386            for b in reader {
387                if tx.blocking_send(b.map_err(Into::into)).is_err() {
388                    break;
389                }
390            }
391            Ok(())
392        });
393        builder.build()
394    }
395}
396
397#[derive(Debug)]
398struct StreamWrite(Arc<StreamConfig>);
399
400impl DisplayAs for StreamWrite {
401    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
402        self.0.source.stream_write_display(t, f)
403    }
404}
405
406#[async_trait]
407impl DataSink for StreamWrite {
408    fn as_any(&self) -> &dyn Any {
409        self
410    }
411
412    fn schema(&self) -> &SchemaRef {
413        self.0.source.schema()
414    }
415
416    async fn write_all(
417        &self,
418        mut data: SendableRecordBatchStream,
419        _context: &Arc<TaskContext>,
420    ) -> Result<u64> {
421        let config = Arc::clone(&self.0);
422        let (sender, mut receiver) = tokio::sync::mpsc::channel::<RecordBatch>(2);
423        // Note: FIFO Files support poll so this could use AsyncFd
424        let write_task = SpawnedTask::spawn_blocking(move || {
425            let mut count = 0_u64;
426            let mut writer = config.writer()?;
427            while let Some(batch) = receiver.blocking_recv() {
428                count += batch.num_rows() as u64;
429                writer.write(&batch)?;
430            }
431            Ok(count)
432        });
433
434        while let Some(b) = data.next().await.transpose()? {
435            if sender.send(b).await.is_err() {
436                break;
437            }
438        }
439        drop(sender);
440        write_task
441            .join_unwind()
442            .await
443            .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))?
444    }
445}