datafusion_catalog/
streaming.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::Session;
24use crate::TableProvider;
25
26use arrow::datatypes::SchemaRef;
27use datafusion_common::{plan_err, DFSchema, Result};
28use datafusion_expr::{Expr, SortExpr, TableType};
29use datafusion_physical_expr::{create_physical_sort_exprs, LexOrdering};
30use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
31use datafusion_physical_plan::ExecutionPlan;
32
33use async_trait::async_trait;
34use log::debug;
35
36#[derive(Debug)]
38pub struct StreamingTable {
39 schema: SchemaRef,
40 partitions: Vec<Arc<dyn PartitionStream>>,
41 infinite: bool,
42 sort_order: Vec<SortExpr>,
43}
44
45impl StreamingTable {
46 pub fn try_new(
48 schema: SchemaRef,
49 partitions: Vec<Arc<dyn PartitionStream>>,
50 ) -> Result<Self> {
51 for x in partitions.iter() {
52 let partition_schema = x.schema();
53 if !schema.contains(partition_schema) {
54 debug!(
55 "target schema does not contain partition schema. \
56 Target_schema: {schema:?}. Partition Schema: {partition_schema:?}"
57 );
58 return plan_err!("Mismatch between schema and batches");
59 }
60 }
61
62 Ok(Self {
63 schema,
64 partitions,
65 infinite: false,
66 sort_order: vec![],
67 })
68 }
69
70 pub fn with_infinite_table(mut self, infinite: bool) -> Self {
72 self.infinite = infinite;
73 self
74 }
75
76 pub fn with_sort_order(mut self, sort_order: Vec<SortExpr>) -> Self {
78 self.sort_order = sort_order;
79 self
80 }
81}
82
83#[async_trait]
84impl TableProvider for StreamingTable {
85 fn as_any(&self) -> &dyn Any {
86 self
87 }
88
89 fn schema(&self) -> SchemaRef {
90 Arc::clone(&self.schema)
91 }
92
93 fn table_type(&self) -> TableType {
94 TableType::View
95 }
96
97 async fn scan(
98 &self,
99 state: &dyn Session,
100 projection: Option<&Vec<usize>>,
101 _filters: &[Expr],
102 limit: Option<usize>,
103 ) -> Result<Arc<dyn ExecutionPlan>> {
104 let physical_sort = if !self.sort_order.is_empty() {
105 let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
106 let eqp = state.execution_props();
107
108 create_physical_sort_exprs(&self.sort_order, &df_schema, eqp)?
109 } else {
110 vec![]
111 };
112
113 Ok(Arc::new(StreamingTableExec::try_new(
114 Arc::clone(&self.schema),
115 self.partitions.clone(),
116 projection,
117 LexOrdering::new(physical_sort),
118 self.infinite,
119 limit,
120 )?))
121 }
122}