datafusion_catalog/
view.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! View data source which uses a LogicalPlan as it's input.
19
20use std::{any::Any, borrow::Cow, sync::Arc};
21
22use crate::Session;
23use crate::TableProvider;
24
25use arrow::datatypes::SchemaRef;
26use async_trait::async_trait;
27use datafusion_common::error::Result;
28use datafusion_common::Column;
29use datafusion_expr::TableType;
30use datafusion_expr::{Expr, LogicalPlan};
31use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
32use datafusion_physical_plan::ExecutionPlan;
33
34/// An implementation of `TableProvider` that uses another logical plan.
35#[derive(Debug)]
36pub struct ViewTable {
37    /// LogicalPlan of the view
38    logical_plan: LogicalPlan,
39    /// File fields + partition columns
40    table_schema: SchemaRef,
41    /// SQL used to create the view, if available
42    definition: Option<String>,
43}
44
45impl ViewTable {
46    /// Create new view that is executed at query runtime.
47    ///
48    /// Takes a `LogicalPlan` and optionally the SQL text of the `CREATE`
49    /// statement.
50    ///
51    /// Notes: the `LogicalPlan` is not validated or type coerced. If this is
52    /// needed it should be done after calling this function.
53    pub fn new(logical_plan: LogicalPlan, definition: Option<String>) -> Self {
54        let table_schema = Arc::clone(logical_plan.schema().inner());
55        Self {
56            logical_plan,
57            table_schema,
58            definition,
59        }
60    }
61
62    #[deprecated(
63        since = "47.0.0",
64        note = "Use `ViewTable::new` instead and apply TypeCoercion to the logical plan if needed"
65    )]
66    pub fn try_new(
67        logical_plan: LogicalPlan,
68        definition: Option<String>,
69    ) -> Result<Self> {
70        Ok(Self::new(logical_plan, definition))
71    }
72
73    /// Get definition ref
74    pub fn definition(&self) -> Option<&String> {
75        self.definition.as_ref()
76    }
77
78    /// Get logical_plan ref
79    pub fn logical_plan(&self) -> &LogicalPlan {
80        &self.logical_plan
81    }
82}
83
84#[async_trait]
85impl TableProvider for ViewTable {
86    fn as_any(&self) -> &dyn Any {
87        self
88    }
89
90    fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> {
91        Some(Cow::Borrowed(&self.logical_plan))
92    }
93
94    fn schema(&self) -> SchemaRef {
95        Arc::clone(&self.table_schema)
96    }
97
98    fn table_type(&self) -> TableType {
99        TableType::View
100    }
101
102    fn get_table_definition(&self) -> Option<&str> {
103        self.definition.as_deref()
104    }
105    fn supports_filters_pushdown(
106        &self,
107        filters: &[&Expr],
108    ) -> Result<Vec<TableProviderFilterPushDown>> {
109        // A filter is added on the View when given
110        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
111    }
112
113    async fn scan(
114        &self,
115        state: &dyn Session,
116        projection: Option<&Vec<usize>>,
117        filters: &[Expr],
118        limit: Option<usize>,
119    ) -> Result<Arc<dyn ExecutionPlan>> {
120        let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
121        let plan = self.logical_plan().clone();
122        let mut plan = LogicalPlanBuilder::from(plan);
123
124        if let Some(filter) = filter {
125            plan = plan.filter(filter)?;
126        }
127
128        let mut plan = if let Some(projection) = projection {
129            // avoiding adding a redundant projection (e.g. SELECT * FROM view)
130            let current_projection =
131                (0..plan.schema().fields().len()).collect::<Vec<usize>>();
132            if projection == &current_projection {
133                plan
134            } else {
135                let fields: Vec<Expr> = projection
136                    .iter()
137                    .map(|i| {
138                        Expr::Column(Column::from(
139                            self.logical_plan.schema().qualified_field(*i),
140                        ))
141                    })
142                    .collect();
143                plan.project(fields)?
144            }
145        } else {
146            plan
147        };
148
149        if let Some(limit) = limit {
150            plan = plan.limit(0, Some(limit))?;
151        }
152
153        state.create_physical_plan(&plan.build()?).await
154    }
155}