datafusion_catalog/
view.rs1use std::{any::Any, borrow::Cow, sync::Arc};
21
22use crate::Session;
23use crate::TableProvider;
24
25use arrow::datatypes::SchemaRef;
26use async_trait::async_trait;
27use datafusion_common::error::Result;
28use datafusion_common::Column;
29use datafusion_expr::TableType;
30use datafusion_expr::{Expr, LogicalPlan};
31use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
32use datafusion_physical_plan::ExecutionPlan;
33
34#[derive(Debug)]
36pub struct ViewTable {
37 logical_plan: LogicalPlan,
39 table_schema: SchemaRef,
41 definition: Option<String>,
43}
44
45impl ViewTable {
46 pub fn new(logical_plan: LogicalPlan, definition: Option<String>) -> Self {
54 let table_schema = Arc::clone(logical_plan.schema().inner());
55 Self {
56 logical_plan,
57 table_schema,
58 definition,
59 }
60 }
61
62 #[deprecated(
63 since = "47.0.0",
64 note = "Use `ViewTable::new` instead and apply TypeCoercion to the logical plan if needed"
65 )]
66 pub fn try_new(
67 logical_plan: LogicalPlan,
68 definition: Option<String>,
69 ) -> Result<Self> {
70 Ok(Self::new(logical_plan, definition))
71 }
72
73 pub fn definition(&self) -> Option<&String> {
75 self.definition.as_ref()
76 }
77
78 pub fn logical_plan(&self) -> &LogicalPlan {
80 &self.logical_plan
81 }
82}
83
84#[async_trait]
85impl TableProvider for ViewTable {
86 fn as_any(&self) -> &dyn Any {
87 self
88 }
89
90 fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> {
91 Some(Cow::Borrowed(&self.logical_plan))
92 }
93
94 fn schema(&self) -> SchemaRef {
95 Arc::clone(&self.table_schema)
96 }
97
98 fn table_type(&self) -> TableType {
99 TableType::View
100 }
101
102 fn get_table_definition(&self) -> Option<&str> {
103 self.definition.as_deref()
104 }
105 fn supports_filters_pushdown(
106 &self,
107 filters: &[&Expr],
108 ) -> Result<Vec<TableProviderFilterPushDown>> {
109 Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
111 }
112
113 async fn scan(
114 &self,
115 state: &dyn Session,
116 projection: Option<&Vec<usize>>,
117 filters: &[Expr],
118 limit: Option<usize>,
119 ) -> Result<Arc<dyn ExecutionPlan>> {
120 let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
121 let plan = self.logical_plan().clone();
122 let mut plan = LogicalPlanBuilder::from(plan);
123
124 if let Some(filter) = filter {
125 plan = plan.filter(filter)?;
126 }
127
128 let mut plan = if let Some(projection) = projection {
129 let current_projection =
131 (0..plan.schema().fields().len()).collect::<Vec<usize>>();
132 if projection == ¤t_projection {
133 plan
134 } else {
135 let fields: Vec<Expr> = projection
136 .iter()
137 .map(|i| {
138 Expr::Column(Column::from(
139 self.logical_plan.schema().qualified_field(*i),
140 ))
141 })
142 .collect();
143 plan.project(fields)?
144 }
145 } else {
146 plan
147 };
148
149 if let Some(limit) = limit {
150 plan = plan.limit(0, Some(limit))?;
151 }
152
153 state.create_physical_plan(&plan.build()?).await
154 }
155}