datafusion/datasource/
view_test.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! View data source which uses a LogicalPlan as it's input.
19
20#[cfg(test)]
21mod tests {
22    use crate::error::Result;
23    use crate::execution::context::SessionConfig;
24    use crate::execution::options::ParquetReadOptions;
25    use crate::prelude::SessionContext;
26    use crate::test_util::parquet_test_data;
27    use datafusion_common::test_util::batches_to_string;
28    use datafusion_expr::{col, lit};
29
30    #[tokio::test]
31    async fn issue_3242() -> Result<()> {
32        // regression test for https://github.com/apache/datafusion/pull/3242
33        let session_ctx = SessionContext::new_with_config(
34            SessionConfig::new().with_information_schema(true),
35        );
36
37        session_ctx
38            .sql("create view v as select 1 as a, 2 as b, 3 as c")
39            .await?
40            .collect()
41            .await?;
42
43        let results = session_ctx
44            .sql("select * from (select b from v)")
45            .await?
46            .collect()
47            .await?;
48
49        insta::assert_snapshot!(batches_to_string(&results),@r###"
50        +---+
51        | b |
52        +---+
53        | 2 |
54        +---+
55        "###);
56
57        Ok(())
58    }
59
60    #[tokio::test]
61    async fn create_view_return_empty_dataframe() -> Result<()> {
62        let session_ctx = SessionContext::new();
63
64        let df = session_ctx
65            .sql("CREATE VIEW xyz AS SELECT 1")
66            .await?
67            .collect()
68            .await?;
69
70        assert!(df.is_empty());
71
72        Ok(())
73    }
74
75    #[tokio::test]
76    async fn query_view() -> Result<()> {
77        let session_ctx = SessionContext::new_with_config(
78            SessionConfig::new().with_information_schema(true),
79        );
80
81        session_ctx
82            .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
83            .await?
84            .collect()
85            .await?;
86
87        let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
88        session_ctx.sql(view_sql).await?.collect().await?;
89
90        let results = session_ctx.sql("SELECT * FROM information_schema.tables WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
91        assert_eq!(results[0].num_rows(), 1);
92
93        let results = session_ctx
94            .sql("SELECT * FROM xyz")
95            .await?
96            .collect()
97            .await?;
98
99        insta::assert_snapshot!(batches_to_string(&results),@r###"
100        +---------+---------+---------+
101        | column1 | column2 | column3 |
102        +---------+---------+---------+
103        | 1       | 2       | 3       |
104        | 4       | 5       | 6       |
105        +---------+---------+---------+
106        "###);
107
108        let view_sql =
109            "CREATE VIEW replace_xyz AS SELECT * REPLACE (column1*2 as column1) FROM xyz";
110        session_ctx.sql(view_sql).await?.collect().await?;
111
112        let results = session_ctx
113            .sql("SELECT * FROM replace_xyz")
114            .await?
115            .collect()
116            .await?;
117
118        insta::assert_snapshot!(batches_to_string(&results),@r###"
119        +---------+---------+---------+
120        | column1 | column2 | column3 |
121        +---------+---------+---------+
122        | 2       | 2       | 3       |
123        | 8       | 5       | 6       |
124        +---------+---------+---------+
125        "###);
126
127        Ok(())
128    }
129
130    #[tokio::test]
131    async fn query_view_with_alias() -> Result<()> {
132        let session_ctx = SessionContext::new_with_config(SessionConfig::new());
133
134        session_ctx
135            .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
136            .await?
137            .collect()
138            .await?;
139
140        let view_sql = "CREATE VIEW xyz AS SELECT column1 AS column1_alias, column2 AS column2_alias FROM abc";
141        session_ctx.sql(view_sql).await?.collect().await?;
142
143        let results = session_ctx
144            .sql("SELECT column1_alias FROM xyz")
145            .await?
146            .collect()
147            .await?;
148
149        insta::assert_snapshot!(batches_to_string(&results),@r###"
150        +---------------+
151        | column1_alias |
152        +---------------+
153        | 1             |
154        | 4             |
155        +---------------+
156        "###);
157
158        Ok(())
159    }
160
161    #[tokio::test]
162    async fn query_view_with_inline_alias() -> Result<()> {
163        let session_ctx = SessionContext::new_with_config(SessionConfig::new());
164
165        session_ctx
166            .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
167            .await?
168            .collect()
169            .await?;
170
171        let view_sql = "CREATE VIEW xyz (column1_alias, column2_alias) AS SELECT column1, column2 FROM abc";
172        session_ctx.sql(view_sql).await?.collect().await?;
173
174        let results = session_ctx
175            .sql("SELECT column2_alias, column1_alias FROM xyz")
176            .await?
177            .collect()
178            .await?;
179
180        insta::assert_snapshot!(batches_to_string(&results),@r###"
181        +---------------+---------------+
182        | column2_alias | column1_alias |
183        +---------------+---------------+
184        | 2             | 1             |
185        | 5             | 4             |
186        +---------------+---------------+
187        "###);
188
189        Ok(())
190    }
191
192    #[tokio::test]
193    async fn query_view_with_projection() -> Result<()> {
194        let session_ctx = SessionContext::new_with_config(
195            SessionConfig::new().with_information_schema(true),
196        );
197
198        session_ctx
199            .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
200            .await?
201            .collect()
202            .await?;
203
204        let view_sql = "CREATE VIEW xyz AS SELECT column1, column2 FROM abc";
205        session_ctx.sql(view_sql).await?.collect().await?;
206
207        let results = session_ctx.sql("SELECT * FROM information_schema.tables WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
208        assert_eq!(results[0].num_rows(), 1);
209
210        let results = session_ctx
211            .sql("SELECT column1 FROM xyz")
212            .await?
213            .collect()
214            .await?;
215
216        insta::assert_snapshot!(batches_to_string(&results),@r###"
217        +---------+
218        | column1 |
219        +---------+
220        | 1       |
221        | 4       |
222        +---------+
223        "###);
224
225        Ok(())
226    }
227
228    #[tokio::test]
229    async fn query_view_with_filter() -> Result<()> {
230        let session_ctx = SessionContext::new_with_config(
231            SessionConfig::new().with_information_schema(true),
232        );
233
234        session_ctx
235            .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
236            .await?
237            .collect()
238            .await?;
239
240        let view_sql = "CREATE VIEW xyz AS SELECT column1, column2 FROM abc";
241        session_ctx.sql(view_sql).await?.collect().await?;
242
243        let results = session_ctx.sql("SELECT * FROM information_schema.tables WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
244        assert_eq!(results[0].num_rows(), 1);
245
246        let results = session_ctx
247            .sql("SELECT column1 FROM xyz WHERE column2 = 5")
248            .await?
249            .collect()
250            .await?;
251
252        insta::assert_snapshot!(batches_to_string(&results),@r###"
253        +---------+
254        | column1 |
255        +---------+
256        | 4       |
257        +---------+
258        "###);
259
260        Ok(())
261    }
262
263    #[tokio::test]
264    async fn query_join_views() -> Result<()> {
265        let session_ctx = SessionContext::new_with_config(
266            SessionConfig::new().with_information_schema(true),
267        );
268
269        session_ctx
270            .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
271            .await?
272            .collect()
273            .await?;
274
275        let view_sql = "CREATE VIEW xyz AS SELECT column1, column2 FROM abc";
276        session_ctx.sql(view_sql).await?.collect().await?;
277
278        let view_sql = "CREATE VIEW lmn AS SELECT column1, column3 FROM abc";
279        session_ctx.sql(view_sql).await?.collect().await?;
280
281        let results = session_ctx.sql("SELECT * FROM information_schema.tables WHERE table_type='VIEW' AND (table_name = 'xyz' OR table_name = 'lmn')").await?.collect().await?;
282        assert_eq!(results[0].num_rows(), 2);
283
284        let results = session_ctx
285            .sql("SELECT * FROM xyz JOIN lmn USING (column1) ORDER BY column2")
286            .await?
287            .collect()
288            .await?;
289
290        insta::assert_snapshot!(batches_to_string(&results),@r###"
291        +---------+---------+---------+
292        | column2 | column1 | column3 |
293        +---------+---------+---------+
294        | 2       | 1       | 3       |
295        | 5       | 4       | 6       |
296        +---------+---------+---------+
297        "###);
298
299        Ok(())
300    }
301
302    #[tokio::test]
303    async fn filter_pushdown_view() -> Result<()> {
304        let ctx = SessionContext::new();
305
306        ctx.register_parquet(
307            "test",
308            &format!("{}/alltypes_plain.snappy.parquet", parquet_test_data()),
309            ParquetReadOptions::default(),
310        )
311        .await?;
312
313        ctx.register_table("t1", ctx.table("test").await?.into_view())?;
314
315        ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;
316
317        let df = ctx
318            .table("t2")
319            .await?
320            .filter(col("id").eq(lit(1)))?
321            .select_columns(&["bool_col", "int_col"])?;
322
323        let plan = df.explain(false, false)?.collect().await?;
324
325        // Filters all the way to Parquet
326        let formatted = arrow::util::pretty::pretty_format_batches(&plan)
327            .unwrap()
328            .to_string();
329        assert!(formatted.contains("FilterExec: id@0 = 1"));
330        Ok(())
331    }
332
333    #[tokio::test]
334    async fn limit_pushdown_view() -> Result<()> {
335        let ctx = SessionContext::new();
336
337        ctx.register_parquet(
338            "test",
339            &format!("{}/alltypes_plain.snappy.parquet", parquet_test_data()),
340            ParquetReadOptions::default(),
341        )
342        .await?;
343
344        ctx.register_table("t1", ctx.table("test").await?.into_view())?;
345
346        ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;
347
348        let df = ctx
349            .table("t2")
350            .await?
351            .limit(0, Some(10))?
352            .select_columns(&["bool_col", "int_col"])?;
353
354        let plan = df.explain(false, false)?.collect().await?;
355        // Limit is included in DataSourceExec
356        let formatted = arrow::util::pretty::pretty_format_batches(&plan)
357            .unwrap()
358            .to_string();
359        assert!(formatted.contains("DataSourceExec: "));
360        assert!(formatted.contains("file_type=parquet"));
361        assert!(formatted.contains("projection=[bool_col, int_col], limit=10"));
362        Ok(())
363    }
364
365    #[tokio::test]
366    async fn create_view_plan() -> Result<()> {
367        let session_ctx = SessionContext::new_with_config(
368            SessionConfig::new().with_information_schema(true),
369        );
370
371        session_ctx
372            .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
373            .await?
374            .collect()
375            .await?;
376
377        let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
378        session_ctx.sql(view_sql).await?.collect().await?;
379
380        let dataframe = session_ctx
381            .sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc")
382            .await?;
383        let plan = dataframe.into_optimized_plan()?;
384        let actual = format!("{}", plan.display_indent());
385        let expected = "\
386        Explain\
387        \n  CreateView: Bare { table: \"xyz\" }\
388        \n    TableScan: abc projection=[column1, column2, column3]";
389        assert_eq!(expected, actual);
390
391        let dataframe = session_ctx
392            .sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc WHERE column2 = 5")
393            .await?;
394        let plan = dataframe.into_optimized_plan()?;
395        let actual = format!("{}", plan.display_indent());
396        let expected = "\
397        Explain\
398        \n  CreateView: Bare { table: \"xyz\" }\
399        \n    Filter: abc.column2 = Int64(5)\
400        \n      TableScan: abc projection=[column1, column2, column3]";
401        assert_eq!(expected, actual);
402
403        let dataframe = session_ctx
404            .sql("EXPLAIN CREATE VIEW xyz AS SELECT column1, column2 FROM abc WHERE column2 = 5")
405            .await?;
406        let plan = dataframe.into_optimized_plan()?;
407        let actual = format!("{}", plan.display_indent());
408        let expected = "\
409        Explain\
410        \n  CreateView: Bare { table: \"xyz\" }\
411        \n    Filter: abc.column2 = Int64(5)\
412        \n      TableScan: abc projection=[column1, column2]";
413        assert_eq!(expected, actual);
414
415        Ok(())
416    }
417
418    #[tokio::test]
419    async fn create_or_replace_view() -> Result<()> {
420        let session_ctx = SessionContext::new_with_config(
421            SessionConfig::new().with_information_schema(true),
422        );
423
424        session_ctx
425            .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
426            .await?
427            .collect()
428            .await?;
429
430        let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
431        session_ctx.sql(view_sql).await?.collect().await?;
432
433        let view_sql = "CREATE OR REPLACE VIEW xyz AS SELECT column1 FROM abc";
434        session_ctx.sql(view_sql).await?.collect().await?;
435
436        let results = session_ctx.sql("SELECT * FROM information_schema.tables WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
437        assert_eq!(results[0].num_rows(), 1);
438
439        let results = session_ctx
440            .sql("SELECT * FROM xyz")
441            .await?
442            .collect()
443            .await?;
444
445        insta::assert_snapshot!(batches_to_string(&results),@r###"
446        +---------+
447        | column1 |
448        +---------+
449        | 1       |
450        | 4       |
451        +---------+
452        "###);
453
454        Ok(())
455    }
456}