datafusion/dataframe/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataFrame`] API for building and executing query plans.
19
20#[cfg(feature = "parquet")]
21mod parquet;
22
23use crate::arrow::record_batch::RecordBatch;
24use crate::arrow::util::pretty;
25use crate::datasource::file_format::csv::CsvFormatFactory;
26use crate::datasource::file_format::format_as_file_type;
27use crate::datasource::file_format::json::JsonFormatFactory;
28use crate::datasource::{
29    provider_as_source, DefaultTableSource, MemTable, TableProvider,
30};
31use crate::error::Result;
32use crate::execution::context::{SessionState, TaskContext};
33use crate::execution::FunctionRegistry;
34use crate::logical_expr::utils::find_window_exprs;
35use crate::logical_expr::{
36    col, ident, Expr, JoinType, LogicalPlan, LogicalPlanBuilder,
37    LogicalPlanBuilderOptions, Partitioning, TableType,
38};
39use crate::physical_plan::{
40    collect, collect_partitioned, execute_stream, execute_stream_partitioned,
41    ExecutionPlan, SendableRecordBatchStream,
42};
43use crate::prelude::SessionContext;
44use std::any::Any;
45use std::borrow::Cow;
46use std::collections::{HashMap, HashSet};
47use std::sync::Arc;
48
49use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
50use arrow::compute::{cast, concat};
51use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
52use datafusion_common::config::{CsvOptions, JsonOptions};
53use datafusion_common::{
54    exec_err, internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err,
55    Column, DFSchema, DataFusionError, ParamValues, ScalarValue, SchemaError,
56    TableReference, UnnestOptions,
57};
58use datafusion_expr::select_expr::SelectExpr;
59use datafusion_expr::{
60    case,
61    dml::InsertOp,
62    expr::{Alias, ScalarFunction},
63    is_null, lit,
64    utils::COUNT_STAR_EXPANSION,
65    ExplainOption, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE,
66};
67use datafusion_functions::core::coalesce;
68use datafusion_functions_aggregate::expr_fn::{
69    avg, count, max, median, min, stddev, sum,
70};
71
72use async_trait::async_trait;
73use datafusion_catalog::Session;
74
75/// Contains options that control how data is
76/// written out from a DataFrame
77pub struct DataFrameWriteOptions {
78    /// Controls how new data should be written to the table, determining whether
79    /// to append, overwrite, or replace existing data.
80    insert_op: InsertOp,
81    /// Controls if all partitions should be coalesced into a single output file
82    /// Generally will have slower performance when set to true.
83    single_file_output: bool,
84    /// Sets which columns should be used for hive-style partitioned writes by name.
85    /// Can be set to empty vec![] for non-partitioned writes.
86    partition_by: Vec<String>,
87    /// Sets which columns should be used for sorting the output by name.
88    /// Can be set to empty vec![] for non-sorted writes.
89    sort_by: Vec<SortExpr>,
90}
91
92impl DataFrameWriteOptions {
93    /// Create a new DataFrameWriteOptions with default values
94    pub fn new() -> Self {
95        DataFrameWriteOptions {
96            insert_op: InsertOp::Append,
97            single_file_output: false,
98            partition_by: vec![],
99            sort_by: vec![],
100        }
101    }
102
103    /// Set the insert operation
104    pub fn with_insert_operation(mut self, insert_op: InsertOp) -> Self {
105        self.insert_op = insert_op;
106        self
107    }
108
109    /// Set the single_file_output value to true or false
110    pub fn with_single_file_output(mut self, single_file_output: bool) -> Self {
111        self.single_file_output = single_file_output;
112        self
113    }
114
115    /// Sets the partition_by columns for output partitioning
116    pub fn with_partition_by(mut self, partition_by: Vec<String>) -> Self {
117        self.partition_by = partition_by;
118        self
119    }
120
121    /// Sets the sort_by columns for output sorting
122    pub fn with_sort_by(mut self, sort_by: Vec<SortExpr>) -> Self {
123        self.sort_by = sort_by;
124        self
125    }
126}
127
128impl Default for DataFrameWriteOptions {
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134/// Represents a logical set of rows with the same named columns.
135///
136/// Similar to a [Pandas DataFrame] or [Spark DataFrame], a DataFusion DataFrame
137/// represents a 2 dimensional table of rows and columns.
138///
139/// The typical workflow using DataFrames looks like
140///
141/// 1. Create a DataFrame via methods on [SessionContext], such as [`read_csv`]
142///    and [`read_parquet`].
143///
144/// 2. Build a desired calculation by calling methods such as [`filter`],
145///    [`select`], [`aggregate`], and [`limit`]
146///
147/// 3. Execute into [`RecordBatch`]es by calling [`collect`]
148///
149/// A `DataFrame` is a wrapper around a [`LogicalPlan`] and the [`SessionState`]
150///    required for execution.
151///
152/// DataFrames are "lazy" in the sense that most methods do not actually compute
153/// anything, they just build up a plan. Calling [`collect`] executes the plan
154/// using the same DataFusion planning and execution process used to execute SQL
155/// and other queries.
156///
157/// [Pandas DataFrame]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
158/// [Spark DataFrame]: https://spark.apache.org/docs/latest/sql-programming-guide.html
159/// [`read_csv`]: SessionContext::read_csv
160/// [`read_parquet`]: SessionContext::read_parquet
161/// [`filter`]: DataFrame::filter
162/// [`select`]: DataFrame::select
163/// [`aggregate`]: DataFrame::aggregate
164/// [`limit`]: DataFrame::limit
165/// [`collect`]: DataFrame::collect
166///
167/// # Example
168/// ```
169/// # use std::sync::Arc;
170/// # use datafusion::prelude::*;
171/// # use datafusion::error::Result;
172/// # use datafusion::functions_aggregate::expr_fn::min;
173/// # use datafusion::arrow::array::{Int32Array, RecordBatch, StringArray};
174/// # use datafusion::arrow::datatypes::{DataType, Field, Schema};
175/// # #[tokio::main]
176/// # async fn main() -> Result<()> {
177/// let ctx = SessionContext::new();
178/// // Read the data from a csv file
179/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
180/// // create a new dataframe that computes the equivalent of
181/// // `SELECT a, MIN(b) FROM df WHERE a <= b GROUP BY a LIMIT 100;`
182/// let df = df.filter(col("a").lt_eq(col("b")))?
183///            .aggregate(vec![col("a")], vec![min(col("b"))])?
184///            .limit(0, Some(100))?;
185/// // Perform the actual computation
186/// let results = df.collect();
187///
188/// // Create a new dataframe with in-memory data
189/// let schema = Schema::new(vec![
190///     Field::new("id", DataType::Int32, true),
191///     Field::new("name", DataType::Utf8, true),
192/// ]);
193/// let batch = RecordBatch::try_new(
194///     Arc::new(schema),
195///     vec![
196///         Arc::new(Int32Array::from(vec![1, 2, 3])),
197///         Arc::new(StringArray::from(vec!["foo", "bar", "baz"])),
198///     ],
199/// )?;
200/// let df = ctx.read_batch(batch)?;
201/// df.show().await?;
202///
203/// // Create a new dataframe with in-memory data using macro
204/// let df = dataframe!(
205///     "id" => [1, 2, 3],
206///     "name" => ["foo", "bar", "baz"]
207///  )?;
208/// df.show().await?;
209/// # Ok(())
210/// # }
211/// ```
212#[derive(Debug, Clone)]
213pub struct DataFrame {
214    // Box the (large) SessionState to reduce the size of DataFrame on the stack
215    session_state: Box<SessionState>,
216    plan: LogicalPlan,
217    // Whether projection ops can skip validation or not. This flag if false
218    // allows for an optimization in `with_column` and `with_column_renamed` functions
219    // where the recursive work required to columnize and normalize expressions can
220    // be skipped if set to false. Since these function calls are often chained or
221    // called many times in dataframe operations this can result in a significant
222    // performance gain.
223    //
224    // The conditions where this can be set to false is when the dataframe function
225    // call results in the last operation being a
226    // `LogicalPlanBuilder::from(plan).project(fields)?.build()` or
227    // `LogicalPlanBuilder::from(plan).project_with_validation(fields)?.build()`
228    // call. This requirement guarantees that the plan has had all columnization
229    // and normalization applied to existing expressions and only new expressions
230    // will require that work. Any operation that update the plan in any way
231    // via anything other than a `project` call should set this to true.
232    projection_requires_validation: bool,
233}
234
235impl DataFrame {
236    /// Create a new `DataFrame ` based on an existing `LogicalPlan`
237    ///
238    /// This is a low-level method and is not typically used by end users. See
239    /// [`SessionContext::read_csv`] and other methods for creating a
240    /// `DataFrame` from an existing datasource.
241    pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self {
242        Self {
243            session_state: Box::new(session_state),
244            plan,
245            projection_requires_validation: true,
246        }
247    }
248
249    /// Creates logical expression from a SQL query text.
250    /// The expression is created and processed against the current schema.
251    ///
252    /// # Example: Parsing SQL queries
253    /// ```
254    /// # use arrow::datatypes::{DataType, Field, Schema};
255    /// # use datafusion::prelude::*;
256    /// # use datafusion_common::{DFSchema, Result};
257    /// # #[tokio::main]
258    /// # async fn main() -> Result<()> {
259    /// // datafusion will parse number as i64 first.
260    /// let sql = "a > 1 and b in (1, 10)";
261    /// let expected = col("a")
262    ///     .gt(lit(1 as i64))
263    ///     .and(col("b").in_list(vec![lit(1 as i64), lit(10 as i64)], false));
264    /// let ctx = SessionContext::new();
265    /// let df = ctx
266    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
267    ///     .await?;
268    /// let expr = df.parse_sql_expr(sql)?;
269    /// assert_eq!(expected, expr);
270    /// # Ok(())
271    /// # }
272    /// ```
273    #[cfg(feature = "sql")]
274    pub fn parse_sql_expr(&self, sql: &str) -> Result<Expr> {
275        let df_schema = self.schema();
276
277        self.session_state.create_logical_expr(sql, df_schema)
278    }
279
280    /// Consume the DataFrame and produce a physical plan
281    pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>> {
282        self.session_state.create_physical_plan(&self.plan).await
283    }
284
285    /// Filter the DataFrame by column. Returns a new DataFrame only containing the
286    /// specified columns.
287    ///
288    /// ```
289    /// # use datafusion::prelude::*;
290    /// # use datafusion::error::Result;
291    /// # use datafusion_common::assert_batches_sorted_eq;
292    /// # #[tokio::main]
293    /// # async fn main() -> Result<()> {
294    /// let ctx = SessionContext::new();
295    /// let df = ctx
296    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
297    ///     .await?;
298    /// let df = df.select_columns(&["a", "b"])?;
299    /// let expected = vec![
300    ///     "+---+---+",
301    ///     "| a | b |",
302    ///     "+---+---+",
303    ///     "| 1 | 2 |",
304    ///     "+---+---+",
305    /// ];
306    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
307    /// # Ok(())
308    /// # }
309    /// ```
310    pub fn select_columns(self, columns: &[&str]) -> Result<DataFrame> {
311        let fields = columns
312            .iter()
313            .flat_map(|name| {
314                self.plan
315                    .schema()
316                    .qualified_fields_with_unqualified_name(name)
317            })
318            .collect::<Vec<_>>();
319        let expr: Vec<Expr> = fields
320            .into_iter()
321            .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
322            .collect();
323        self.select(expr)
324    }
325    /// Project arbitrary list of expression strings into a new `DataFrame`.
326    /// Method will parse string expressions into logical plan expressions.
327    ///
328    /// The output `DataFrame` has one column for each element in `exprs`.
329    ///
330    /// # Example
331    /// ```
332    /// # use datafusion::prelude::*;
333    /// # use datafusion::error::Result;
334    /// # #[tokio::main]
335    /// # async fn main() -> Result<()> {
336    /// let ctx = SessionContext::new();
337    /// let df = ctx
338    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
339    ///     .await?;
340    /// let df: DataFrame = df.select_exprs(&["a * b", "c"])?;
341    /// # Ok(())
342    /// # }
343    /// ```
344    #[cfg(feature = "sql")]
345    pub fn select_exprs(self, exprs: &[&str]) -> Result<DataFrame> {
346        let expr_list = exprs
347            .iter()
348            .map(|e| self.parse_sql_expr(e))
349            .collect::<Result<Vec<_>>>()?;
350
351        self.select(expr_list)
352    }
353
354    /// Project arbitrary expressions (like SQL SELECT expressions) into a new
355    /// `DataFrame`.
356    ///
357    /// The output `DataFrame` has one column for each element in `expr_list`.
358    ///
359    /// # Example
360    /// ```
361    /// # use datafusion::prelude::*;
362    /// # use datafusion::error::Result;
363    /// # use datafusion_common::assert_batches_sorted_eq;
364    /// # #[tokio::main]
365    /// # async fn main() -> Result<()> {
366    /// let ctx = SessionContext::new();
367    /// let df = ctx
368    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
369    ///     .await?;
370    /// let df = df.select(vec![col("a"), col("b") * col("c")])?;
371    /// let expected = vec![
372    ///     "+---+-----------------------+",
373    ///     "| a | ?table?.b * ?table?.c |",
374    ///     "+---+-----------------------+",
375    ///     "| 1 | 6                     |",
376    ///     "+---+-----------------------+",
377    /// ];
378    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
379    /// # Ok(())
380    /// # }
381    /// ```
382    pub fn select(
383        self,
384        expr_list: impl IntoIterator<Item = impl Into<SelectExpr>>,
385    ) -> Result<DataFrame> {
386        let expr_list: Vec<SelectExpr> =
387            expr_list.into_iter().map(|e| e.into()).collect::<Vec<_>>();
388
389        let expressions = expr_list.iter().filter_map(|e| match e {
390            SelectExpr::Expression(expr) => Some(expr),
391            _ => None,
392        });
393
394        let window_func_exprs = find_window_exprs(expressions);
395        let plan = if window_func_exprs.is_empty() {
396            self.plan
397        } else {
398            LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
399        };
400
401        let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
402
403        Ok(DataFrame {
404            session_state: self.session_state,
405            plan: project_plan,
406            projection_requires_validation: false,
407        })
408    }
409
410    /// Returns a new DataFrame containing all columns except the specified columns.
411    ///
412    /// ```
413    /// # use datafusion::prelude::*;
414    /// # use datafusion::error::Result;
415    /// # use datafusion_common::assert_batches_sorted_eq;
416    /// # #[tokio::main]
417    /// # async fn main() -> Result<()> {
418    /// let ctx = SessionContext::new();
419    /// let df = ctx
420    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
421    ///     .await?;
422    /// // +----+----+----+
423    /// // | a  | b  | c  |
424    /// // +----+----+----+
425    /// // | 1  | 2  | 3  |
426    /// // +----+----+----+
427    /// let df = df.drop_columns(&["a"])?;
428    /// let expected = vec![
429    ///     "+---+---+",
430    ///     "| b | c |",
431    ///     "+---+---+",
432    ///     "| 2 | 3 |",
433    ///     "+---+---+",
434    /// ];
435    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
436    /// # Ok(())
437    /// # }
438    /// ```
439    pub fn drop_columns(self, columns: &[&str]) -> Result<DataFrame> {
440        let fields_to_drop = columns
441            .iter()
442            .flat_map(|name| {
443                self.plan
444                    .schema()
445                    .qualified_fields_with_unqualified_name(name)
446            })
447            .collect::<Vec<_>>();
448        let expr: Vec<Expr> = self
449            .plan
450            .schema()
451            .fields()
452            .into_iter()
453            .enumerate()
454            .map(|(idx, _)| self.plan.schema().qualified_field(idx))
455            .filter(|(qualifier, f)| !fields_to_drop.contains(&(*qualifier, f)))
456            .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
457            .collect();
458        self.select(expr)
459    }
460
461    /// Expand multiple list/struct columns into a set of rows and new columns.
462    ///
463    /// See also: [`UnnestOptions`] documentation for the behavior of `unnest`
464    ///
465    /// # Example
466    /// ```
467    /// # use datafusion::prelude::*;
468    /// # use datafusion::error::Result;
469    /// # use datafusion_common::assert_batches_sorted_eq;
470    /// # #[tokio::main]
471    /// # async fn main() -> Result<()> {
472    /// let ctx = SessionContext::new();
473    /// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?;
474    /// // expand into multiple columns if it's json array, flatten field name if it's nested structure
475    /// let df = df.unnest_columns(&["b","c","d"])?;
476    /// let expected = vec![
477    ///     "+---+------+-------+-----+-----+",
478    ///     "| a | b    | c     | d.e | d.f |",
479    ///     "+---+------+-------+-----+-----+",
480    ///     "| 1 | 2.0  | false | 1   | 2   |",
481    ///     "| 1 | 1.3  | true  | 1   | 2   |",
482    ///     "| 1 | -6.1 |       | 1   | 2   |",
483    ///     "| 2 | 3.0  | false |     |     |",
484    ///     "| 2 | 2.3  | true  |     |     |",
485    ///     "| 2 | -7.1 |       |     |     |",
486    ///     "+---+------+-------+-----+-----+"
487    /// ];
488    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
489    /// # Ok(())
490    /// # }
491    /// ```
492    pub fn unnest_columns(self, columns: &[&str]) -> Result<DataFrame> {
493        self.unnest_columns_with_options(columns, UnnestOptions::new())
494    }
495
496    /// Expand multiple list columns into a set of rows, with
497    /// behavior controlled by [`UnnestOptions`].
498    ///
499    /// Please see the documentation on [`UnnestOptions`] for more
500    /// details about the meaning of unnest.
501    pub fn unnest_columns_with_options(
502        self,
503        columns: &[&str],
504        options: UnnestOptions,
505    ) -> Result<DataFrame> {
506        let columns = columns.iter().map(|c| Column::from(*c)).collect();
507        let plan = LogicalPlanBuilder::from(self.plan)
508            .unnest_columns_with_options(columns, options)?
509            .build()?;
510        Ok(DataFrame {
511            session_state: self.session_state,
512            plan,
513            projection_requires_validation: true,
514        })
515    }
516
517    /// Return a DataFrame with only rows for which `predicate` evaluates to
518    /// `true`.
519    ///
520    /// Rows for which `predicate` evaluates to `false` or `null`
521    /// are filtered out.
522    ///
523    /// # Example
524    /// ```
525    /// # use datafusion::prelude::*;
526    /// # use datafusion::error::Result;
527    /// # use datafusion_common::assert_batches_sorted_eq;
528    /// # #[tokio::main]
529    /// # async fn main() -> Result<()> {
530    /// let ctx = SessionContext::new();
531    /// let df = ctx
532    ///     .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
533    ///     .await?;
534    /// let df = df.filter(col("a").lt_eq(col("b")))?;
535    /// // all rows where a <= b are returned
536    /// let expected = vec![
537    ///     "+---+---+---+",
538    ///     "| a | b | c |",
539    ///     "+---+---+---+",
540    ///     "| 1 | 2 | 3 |",
541    ///     "| 4 | 5 | 6 |",
542    ///     "| 7 | 8 | 9 |",
543    ///     "+---+---+---+",
544    /// ];
545    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
546    /// # Ok(())
547    /// # }
548    /// ```
549    pub fn filter(self, predicate: Expr) -> Result<DataFrame> {
550        let plan = LogicalPlanBuilder::from(self.plan)
551            .filter(predicate)?
552            .build()?;
553        Ok(DataFrame {
554            session_state: self.session_state,
555            plan,
556            projection_requires_validation: true,
557        })
558    }
559
560    /// Return a new `DataFrame` that aggregates the rows of the current
561    /// `DataFrame`, first optionally grouping by the given expressions.
562    ///
563    /// # Example
564    /// ```
565    /// # use datafusion::prelude::*;
566    /// # use datafusion::error::Result;
567    /// # use datafusion::functions_aggregate::expr_fn::min;
568    /// # use datafusion_common::assert_batches_sorted_eq;
569    /// # #[tokio::main]
570    /// # async fn main() -> Result<()> {
571    /// let ctx = SessionContext::new();
572    /// let df = ctx
573    ///     .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
574    ///     .await?;
575    ///
576    /// // The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
577    /// let df1 = df.clone().aggregate(vec![col("a")], vec![min(col("b"))])?;
578    /// let expected1 = vec![
579    ///     "+---+----------------+",
580    ///     "| a | min(?table?.b) |",
581    ///     "+---+----------------+",
582    ///     "| 1 | 2              |",
583    ///     "| 4 | 5              |",
584    ///     "| 7 | 8              |",
585    ///     "+---+----------------+",
586    /// ];
587    /// assert_batches_sorted_eq!(expected1, &df1.collect().await?);
588    /// // The following use is the equivalent of "SELECT MIN(b)"
589    /// let df2 = df.aggregate(vec![], vec![min(col("b"))])?;
590    /// let expected2 = vec![
591    ///     "+----------------+",
592    ///     "| min(?table?.b) |",
593    ///     "+----------------+",
594    ///     "| 2              |",
595    ///     "+----------------+",
596    /// ];
597    /// # assert_batches_sorted_eq!(expected2, &df2.collect().await?);
598    /// # Ok(())
599    /// # }
600    /// ```
601    pub fn aggregate(
602        self,
603        group_expr: Vec<Expr>,
604        aggr_expr: Vec<Expr>,
605    ) -> Result<DataFrame> {
606        let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
607        let aggr_expr_len = aggr_expr.len();
608        let options =
609            LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
610        let plan = LogicalPlanBuilder::from(self.plan)
611            .with_options(options)
612            .aggregate(group_expr, aggr_expr)?
613            .build()?;
614        let plan = if is_grouping_set {
615            let grouping_id_pos = plan.schema().fields().len() - 1 - aggr_expr_len;
616            // For grouping sets we do a project to not expose the internal grouping id
617            let exprs = plan
618                .schema()
619                .columns()
620                .into_iter()
621                .enumerate()
622                .filter(|(idx, _)| *idx != grouping_id_pos)
623                .map(|(_, column)| Expr::Column(column))
624                .collect::<Vec<_>>();
625            LogicalPlanBuilder::from(plan).project(exprs)?.build()?
626        } else {
627            plan
628        };
629        Ok(DataFrame {
630            session_state: self.session_state,
631            plan,
632            projection_requires_validation: !is_grouping_set,
633        })
634    }
635
636    /// Return a new DataFrame that adds the result of evaluating one or more
637    /// window functions ([`Expr::WindowFunction`]) to the existing columns
638    pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame> {
639        let plan = LogicalPlanBuilder::from(self.plan)
640            .window(window_exprs)?
641            .build()?;
642        Ok(DataFrame {
643            session_state: self.session_state,
644            plan,
645            projection_requires_validation: true,
646        })
647    }
648
649    /// Returns a new `DataFrame` with a limited number of rows.
650    ///
651    /// # Arguments
652    /// `skip` - Number of rows to skip before fetch any row
653    /// `fetch` - Maximum number of rows to return, after skipping `skip` rows.
654    ///
655    /// # Example
656    /// ```
657    /// # use datafusion::prelude::*;
658    /// # use datafusion::error::Result;
659    /// # use datafusion_common::assert_batches_sorted_eq;
660    /// # #[tokio::main]
661    /// # async fn main() -> Result<()> {
662    /// let ctx = SessionContext::new();
663    /// let df = ctx
664    ///     .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
665    ///     .await?;
666    /// let df = df.limit(1, Some(2))?;
667    /// let expected = vec![
668    ///     "+---+---+---+",
669    ///     "| a | b | c |",
670    ///     "+---+---+---+",
671    ///     "| 4 | 5 | 6 |",
672    ///     "| 7 | 8 | 9 |",
673    ///     "+---+---+---+",
674    /// ];
675    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
676    /// # Ok(())
677    /// # }
678    /// ```
679    pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame> {
680        let plan = LogicalPlanBuilder::from(self.plan)
681            .limit(skip, fetch)?
682            .build()?;
683        Ok(DataFrame {
684            session_state: self.session_state,
685            plan,
686            projection_requires_validation: self.projection_requires_validation,
687        })
688    }
689
690    /// Calculate the union of two [`DataFrame`]s, preserving duplicate rows.
691    ///
692    /// The two [`DataFrame`]s must have exactly the same schema
693    ///
694    /// # Example
695    /// ```
696    /// # use datafusion::prelude::*;
697    /// # use datafusion::error::Result;
698    /// # use datafusion_common::assert_batches_sorted_eq;
699    /// # #[tokio::main]
700    /// # async fn main() -> Result<()> {
701    /// let ctx = SessionContext::new();
702    /// let df = ctx
703    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
704    ///     .await?;
705    /// let d2 = df.clone();
706    /// let df = df.union(d2)?;
707    /// let expected = vec![
708    ///     "+---+---+---+",
709    ///     "| a | b | c |",
710    ///     "+---+---+---+",
711    ///     "| 1 | 2 | 3 |",
712    ///     "| 1 | 2 | 3 |",
713    ///     "+---+---+---+",
714    /// ];
715    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
716    /// # Ok(())
717    /// # }
718    /// ```
719    pub fn union(self, dataframe: DataFrame) -> Result<DataFrame> {
720        let plan = LogicalPlanBuilder::from(self.plan)
721            .union(dataframe.plan)?
722            .build()?;
723        Ok(DataFrame {
724            session_state: self.session_state,
725            plan,
726            projection_requires_validation: true,
727        })
728    }
729
730    /// Calculate the union of two [`DataFrame`]s using column names, preserving duplicate rows.
731    ///
732    /// The two [`DataFrame`]s are combined using column names rather than position,
733    /// filling missing columns with null.
734    ///
735    ///
736    /// # Example
737    /// ```
738    /// # use datafusion::prelude::*;
739    /// # use datafusion::error::Result;
740    /// # use datafusion_common::assert_batches_sorted_eq;
741    /// # #[tokio::main]
742    /// # async fn main() -> Result<()> {
743    /// let ctx = SessionContext::new();
744    /// let df = ctx
745    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
746    ///     .await?;
747    /// let d2 = df
748    ///     .clone()
749    ///     .select_columns(&["b", "c", "a"])?
750    ///     .with_column("d", lit("77"))?;
751    /// let df = df.union_by_name(d2)?;
752    /// let expected = vec![
753    ///     "+---+---+---+----+",
754    ///     "| a | b | c | d  |",
755    ///     "+---+---+---+----+",
756    ///     "| 1 | 2 | 3 |    |",
757    ///     "| 1 | 2 | 3 | 77 |",
758    ///     "+---+---+---+----+",
759    /// ];
760    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
761    /// # Ok(())
762    /// # }
763    /// ```
764    pub fn union_by_name(self, dataframe: DataFrame) -> Result<DataFrame> {
765        let plan = LogicalPlanBuilder::from(self.plan)
766            .union_by_name(dataframe.plan)?
767            .build()?;
768        Ok(DataFrame {
769            session_state: self.session_state,
770            plan,
771            projection_requires_validation: true,
772        })
773    }
774
775    /// Calculate the distinct union of two [`DataFrame`]s.
776    ///
777    /// The two [`DataFrame`]s must have exactly the same schema. Any duplicate
778    /// rows are discarded.
779    ///
780    /// # Example
781    /// ```
782    /// # use datafusion::prelude::*;
783    /// # use datafusion::error::Result;
784    /// # use datafusion_common::assert_batches_sorted_eq;
785    /// # #[tokio::main]
786    /// # async fn main() -> Result<()> {
787    /// let ctx = SessionContext::new();
788    /// let df = ctx
789    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
790    ///     .await?;
791    /// let d2 = df.clone();
792    /// let df = df.union_distinct(d2)?;
793    /// // df2 are duplicate of df
794    /// let expected = vec![
795    ///     "+---+---+---+",
796    ///     "| a | b | c |",
797    ///     "+---+---+---+",
798    ///     "| 1 | 2 | 3 |",
799    ///     "+---+---+---+",
800    /// ];
801    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
802    /// # Ok(())
803    /// # }
804    /// ```
805    pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
806        let plan = LogicalPlanBuilder::from(self.plan)
807            .union_distinct(dataframe.plan)?
808            .build()?;
809        Ok(DataFrame {
810            session_state: self.session_state,
811            plan,
812            projection_requires_validation: true,
813        })
814    }
815
816    /// Calculate the union of two [`DataFrame`]s using column names with all duplicated rows removed.
817    ///
818    /// The two [`DataFrame`]s are combined using column names rather than position,
819    /// filling missing columns with null.
820    ///
821    ///
822    /// # Example
823    /// ```
824    /// # use datafusion::prelude::*;
825    /// # use datafusion::error::Result;
826    /// # use datafusion_common::assert_batches_sorted_eq;
827    /// # #[tokio::main]
828    /// # async fn main() -> Result<()> {
829    /// let ctx = SessionContext::new();
830    /// let df = ctx
831    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
832    ///     .await?;
833    /// let d2 = df.clone().select_columns(&["b", "c", "a"])?;
834    /// let df = df.union_by_name_distinct(d2)?;
835    /// let expected = vec![
836    ///     "+---+---+---+",
837    ///     "| a | b | c |",
838    ///     "+---+---+---+",
839    ///     "| 1 | 2 | 3 |",
840    ///     "+---+---+---+",
841    /// ];
842    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
843    /// # Ok(())
844    /// # }
845    /// ```
846    pub fn union_by_name_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
847        let plan = LogicalPlanBuilder::from(self.plan)
848            .union_by_name_distinct(dataframe.plan)?
849            .build()?;
850        Ok(DataFrame {
851            session_state: self.session_state,
852            plan,
853            projection_requires_validation: true,
854        })
855    }
856
857    /// Return a new `DataFrame` with all duplicated rows removed.
858    ///
859    /// # Example
860    /// ```
861    /// # use datafusion::prelude::*;
862    /// # use datafusion::error::Result;
863    /// # use datafusion_common::assert_batches_sorted_eq;
864    /// # #[tokio::main]
865    /// # async fn main() -> Result<()> {
866    /// let ctx = SessionContext::new();
867    /// let df = ctx
868    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
869    ///     .await?;
870    /// let df = df.distinct()?;
871    /// let expected = vec![
872    ///     "+---+---+---+",
873    ///     "| a | b | c |",
874    ///     "+---+---+---+",
875    ///     "| 1 | 2 | 3 |",
876    ///     "+---+---+---+",
877    /// ];
878    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
879    /// # Ok(())
880    /// # }
881    /// ```
882    pub fn distinct(self) -> Result<DataFrame> {
883        let plan = LogicalPlanBuilder::from(self.plan).distinct()?.build()?;
884        Ok(DataFrame {
885            session_state: self.session_state,
886            plan,
887            projection_requires_validation: true,
888        })
889    }
890
891    /// Return a new `DataFrame` with duplicated rows removed as per the specified expression list
892    /// according to the provided sorting expressions grouped by the `DISTINCT ON` clause
893    /// expressions.
894    ///
895    /// # Example
896    /// ```
897    /// # use datafusion::prelude::*;
898    /// # use datafusion::error::Result;
899    /// # use datafusion_common::assert_batches_sorted_eq;
900    /// # #[tokio::main]
901    /// # async fn main() -> Result<()> {
902    /// let ctx = SessionContext::new();
903    /// let df = ctx
904    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
905    ///     .await?
906    ///     // Return a single row (a, b) for each distinct value of a
907    ///     .distinct_on(vec![col("a")], vec![col("a"), col("b")], None)?;
908    /// let expected = vec![
909    ///     "+---+---+",
910    ///     "| a | b |",
911    ///     "+---+---+",
912    ///     "| 1 | 2 |",
913    ///     "+---+---+",
914    /// ];
915    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
916    /// # Ok(())
917    /// # }
918    /// ```
919    pub fn distinct_on(
920        self,
921        on_expr: Vec<Expr>,
922        select_expr: Vec<Expr>,
923        sort_expr: Option<Vec<SortExpr>>,
924    ) -> Result<DataFrame> {
925        let plan = LogicalPlanBuilder::from(self.plan)
926            .distinct_on(on_expr, select_expr, sort_expr)?
927            .build()?;
928        Ok(DataFrame {
929            session_state: self.session_state,
930            plan,
931            projection_requires_validation: true,
932        })
933    }
934
935    /// Return a new `DataFrame` that has statistics for a DataFrame.
936    ///
937    /// Only summarizes numeric datatypes at the moment and returns nulls for
938    /// non numeric datatypes. The output format is modeled after pandas
939    ///
940    /// # Example
941    /// ```
942    /// # use datafusion::prelude::*;
943    /// # use datafusion::error::Result;
944    /// # use arrow::util::pretty;
945    /// # use datafusion_common::assert_batches_sorted_eq;
946    /// # #[tokio::main]
947    /// # async fn main() -> Result<()> {
948    /// let ctx = SessionContext::new();
949    /// let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?;
950    /// let stat = df.describe().await?;
951    /// # // some output column are ignored
952    /// let expected = vec![
953    ///     "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
954    ///     "| describe   | c_custkey          | c_name             | c_address                          | c_nationkey        | c_phone         | c_acctbal          | c_mktsegment | c_comment                                                                                                |",
955    ///     "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
956    ///     "| count      | 9.0                | 9                  | 9                                  | 9.0                | 9               | 9.0                | 9            | 9                                                                                                        |",
957    ///     "| max        | 10.0               | Customer#000000010 | xKiAFTjUsCuxfeleNqefumTrjS         | 20.0               | 30-114-968-4951 | 9561.95            | MACHINERY    | tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious |",
958    ///     "| mean       | 6.0                | null               | null                               | 9.88888888888889   | null            | 5153.2155555555555 | null         | null                                                                                                     |",
959    ///     "| median     | 6.0                | null               | null                               | 8.0                | null            | 6819.74            | null         | null                                                                                                     |",
960    ///     "| min        | 2.0                | Customer#000000002 | 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 | 1.0                | 11-719-748-3364 | 121.65             | AUTOMOBILE   |  deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov   |",
961    ///     "| null_count | 0.0                | 0                  | 0                                  | 0.0                | 0               | 0.0                | 0            | 0                                                                                                        |",
962    ///     "| std        | 2.7386127875258306 | null               | null                               | 7.2188026092359046 | null            | 3522.169804254585  | null         | null                                                                                                     |",
963    ///     "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+"];
964    /// assert_batches_sorted_eq!(expected, &stat.collect().await?);
965    /// # Ok(())
966    /// # }
967    /// ```
968    pub async fn describe(self) -> Result<Self> {
969        //the functions now supported
970        let supported_describe_functions =
971            vec!["count", "null_count", "mean", "std", "min", "max", "median"];
972
973        let original_schema_fields = self.schema().fields().iter();
974
975        //define describe column
976        let mut describe_schemas = vec![Field::new("describe", DataType::Utf8, false)];
977        describe_schemas.extend(original_schema_fields.clone().map(|field| {
978            if field.data_type().is_numeric() {
979                Field::new(field.name(), DataType::Float64, true)
980            } else {
981                Field::new(field.name(), DataType::Utf8, true)
982            }
983        }));
984
985        //collect recordBatch
986        let describe_record_batch = [
987            // count aggregation
988            self.clone().aggregate(
989                vec![],
990                original_schema_fields
991                    .clone()
992                    .map(|f| count(ident(f.name())).alias(f.name()))
993                    .collect::<Vec<_>>(),
994            ),
995            // null_count aggregation
996            self.clone().aggregate(
997                vec![],
998                original_schema_fields
999                    .clone()
1000                    .map(|f| {
1001                        sum(case(is_null(ident(f.name())))
1002                            .when(lit(true), lit(1))
1003                            .otherwise(lit(0))
1004                            .unwrap())
1005                        .alias(f.name())
1006                    })
1007                    .collect::<Vec<_>>(),
1008            ),
1009            // mean aggregation
1010            self.clone().aggregate(
1011                vec![],
1012                original_schema_fields
1013                    .clone()
1014                    .filter(|f| f.data_type().is_numeric())
1015                    .map(|f| avg(ident(f.name())).alias(f.name()))
1016                    .collect::<Vec<_>>(),
1017            ),
1018            // std aggregation
1019            self.clone().aggregate(
1020                vec![],
1021                original_schema_fields
1022                    .clone()
1023                    .filter(|f| f.data_type().is_numeric())
1024                    .map(|f| stddev(ident(f.name())).alias(f.name()))
1025                    .collect::<Vec<_>>(),
1026            ),
1027            // min aggregation
1028            self.clone().aggregate(
1029                vec![],
1030                original_schema_fields
1031                    .clone()
1032                    .filter(|f| {
1033                        !matches!(f.data_type(), DataType::Binary | DataType::Boolean)
1034                    })
1035                    .map(|f| min(ident(f.name())).alias(f.name()))
1036                    .collect::<Vec<_>>(),
1037            ),
1038            // max aggregation
1039            self.clone().aggregate(
1040                vec![],
1041                original_schema_fields
1042                    .clone()
1043                    .filter(|f| {
1044                        !matches!(f.data_type(), DataType::Binary | DataType::Boolean)
1045                    })
1046                    .map(|f| max(ident(f.name())).alias(f.name()))
1047                    .collect::<Vec<_>>(),
1048            ),
1049            // median aggregation
1050            self.clone().aggregate(
1051                vec![],
1052                original_schema_fields
1053                    .clone()
1054                    .filter(|f| f.data_type().is_numeric())
1055                    .map(|f| median(ident(f.name())).alias(f.name()))
1056                    .collect::<Vec<_>>(),
1057            ),
1058        ];
1059
1060        // first column with function names
1061        let mut array_ref_vec: Vec<ArrayRef> = vec![Arc::new(StringArray::from(
1062            supported_describe_functions.clone(),
1063        ))];
1064        for field in original_schema_fields {
1065            let mut array_datas = vec![];
1066            for result in describe_record_batch.iter() {
1067                let array_ref = match result {
1068                    Ok(df) => {
1069                        let batches = df.clone().collect().await;
1070                        match batches {
1071                            Ok(batches)
1072                                if batches.len() == 1
1073                                    && batches[0]
1074                                        .column_by_name(field.name())
1075                                        .is_some() =>
1076                            {
1077                                let column =
1078                                    batches[0].column_by_name(field.name()).unwrap();
1079
1080                                if column.data_type().is_null() {
1081                                    Arc::new(StringArray::from(vec!["null"]))
1082                                } else if field.data_type().is_numeric() {
1083                                    cast(column, &DataType::Float64)?
1084                                } else {
1085                                    cast(column, &DataType::Utf8)?
1086                                }
1087                            }
1088                            _ => Arc::new(StringArray::from(vec!["null"])),
1089                        }
1090                    }
1091                    //Handling error when only boolean/binary column, and in other cases
1092                    Err(err)
1093                        if err.to_string().contains(
1094                            "Error during planning: \
1095                                            Aggregate requires at least one grouping \
1096                                            or aggregate expression",
1097                        ) =>
1098                    {
1099                        Arc::new(StringArray::from(vec!["null"]))
1100                    }
1101                    Err(e) => return exec_err!("{}", e),
1102                };
1103                array_datas.push(array_ref);
1104            }
1105            array_ref_vec.push(concat(
1106                array_datas
1107                    .iter()
1108                    .map(|af| af.as_ref())
1109                    .collect::<Vec<_>>()
1110                    .as_slice(),
1111            )?);
1112        }
1113
1114        let describe_record_batch =
1115            RecordBatch::try_new(Arc::new(Schema::new(describe_schemas)), array_ref_vec)?;
1116
1117        let provider = MemTable::try_new(
1118            describe_record_batch.schema(),
1119            vec![vec![describe_record_batch]],
1120        )?;
1121
1122        let plan = LogicalPlanBuilder::scan(
1123            UNNAMED_TABLE,
1124            provider_as_source(Arc::new(provider)),
1125            None,
1126        )?
1127        .build()?;
1128
1129        Ok(DataFrame {
1130            session_state: self.session_state,
1131            plan,
1132            projection_requires_validation: self.projection_requires_validation,
1133        })
1134    }
1135
1136    /// Apply a sort by provided expressions with default direction
1137    pub fn sort_by(self, expr: Vec<Expr>) -> Result<DataFrame> {
1138        self.sort(
1139            expr.into_iter()
1140                .map(|e| e.sort(true, false))
1141                .collect::<Vec<SortExpr>>(),
1142        )
1143    }
1144
1145    /// Sort the DataFrame by the specified sorting expressions.
1146    ///
1147    /// Note that any expression can be turned into
1148    /// a sort expression by calling its [sort](Expr::sort) method.
1149    ///
1150    /// # Example
1151    ///
1152    /// ```
1153    /// # use datafusion::prelude::*;
1154    /// # use datafusion::error::Result;
1155    /// # use datafusion_common::assert_batches_sorted_eq;
1156    /// # #[tokio::main]
1157    /// # async fn main() -> Result<()> {
1158    /// let ctx = SessionContext::new();
1159    /// let df = ctx
1160    ///     .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
1161    ///     .await?;
1162    /// let df = df.sort(vec![
1163    ///     col("a").sort(false, true), // a DESC, nulls first
1164    ///     col("b").sort(true, false), // b ASC, nulls last
1165    /// ])?;
1166    /// let expected = vec![
1167    ///     "+---+---+---+",
1168    ///     "| a | b | c |",
1169    ///     "+---+---+---+",
1170    ///     "| 1 | 2 | 3 |",
1171    ///     "| 4 | 5 | 6 |",
1172    ///     "| 7 | 8 | 9 |",
1173    ///     "+---+---+---+",
1174    /// ];
1175    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1176    /// # Ok(())
1177    /// # }
1178    /// ```
1179    pub fn sort(self, expr: Vec<SortExpr>) -> Result<DataFrame> {
1180        let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
1181        Ok(DataFrame {
1182            session_state: self.session_state,
1183            plan,
1184            projection_requires_validation: self.projection_requires_validation,
1185        })
1186    }
1187
1188    /// Join this `DataFrame` with another `DataFrame` using explicitly specified
1189    /// columns and an optional filter expression.
1190    ///
1191    /// See [`join_on`](Self::join_on) for a more concise way to specify the
1192    /// join condition. Since DataFusion will automatically identify and
1193    /// optimize equality predicates there is no performance difference between
1194    /// this function and `join_on`
1195    ///
1196    /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
1197    /// example below), which are then combined with the optional `filter`
1198    /// expression. If `left_cols` and `right_cols` contain ambiguous column
1199    /// references, they will be disambiguated by prioritizing the left relation
1200    /// for `left_cols` and the right relation for `right_cols`.
1201    ///
1202    /// Note that in case of outer join, the `filter` is applied to only matched rows.
1203    ///
1204    /// # Example
1205    /// ```
1206    /// # use datafusion::prelude::*;
1207    /// # use datafusion::error::Result;
1208    /// # use datafusion_common::assert_batches_sorted_eq;
1209    /// # #[tokio::main]
1210    /// # async fn main() -> Result<()> {
1211    /// let ctx = SessionContext::new();
1212    /// let left = ctx
1213    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1214    ///     .await?;
1215    /// let right = ctx
1216    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1217    ///     .await?
1218    ///     .select(vec![
1219    ///         col("a").alias("a2"),
1220    ///         col("b").alias("b2"),
1221    ///         col("c").alias("c2"),
1222    ///     ])?;
1223    /// // Perform the equivalent of `left INNER JOIN right ON (a = a2 AND b = b2)`
1224    /// // finding all pairs of rows from `left` and `right` where `a = a2` and `b = b2`.
1225    /// let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"], None)?;
1226    /// let expected = vec![
1227    ///     "+---+---+---+----+----+----+",
1228    ///     "| a | b | c | a2 | b2 | c2 |",
1229    ///     "+---+---+---+----+----+----+",
1230    ///     "| 1 | 2 | 3 | 1  | 2  | 3  |",
1231    ///     "+---+---+---+----+----+----+",
1232    /// ];
1233    /// assert_batches_sorted_eq!(expected, &join.collect().await?);
1234    /// # Ok(())
1235    /// # }
1236    /// ```
1237    pub fn join(
1238        self,
1239        right: DataFrame,
1240        join_type: JoinType,
1241        left_cols: &[&str],
1242        right_cols: &[&str],
1243        filter: Option<Expr>,
1244    ) -> Result<DataFrame> {
1245        let plan = LogicalPlanBuilder::from(self.plan)
1246            .join(
1247                right.plan,
1248                join_type,
1249                (left_cols.to_vec(), right_cols.to_vec()),
1250                filter,
1251            )?
1252            .build()?;
1253        Ok(DataFrame {
1254            session_state: self.session_state,
1255            plan,
1256            projection_requires_validation: true,
1257        })
1258    }
1259
1260    /// Join this `DataFrame` with another `DataFrame` using the specified
1261    /// expressions.
1262    ///
1263    /// Note that DataFusion automatically optimizes joins, including
1264    /// identifying and optimizing equality predicates.
1265    ///
1266    /// # Example
1267    /// ```
1268    /// # use datafusion::prelude::*;
1269    /// # use datafusion::error::Result;
1270    /// # use datafusion_common::assert_batches_sorted_eq;
1271    /// # #[tokio::main]
1272    /// # async fn main() -> Result<()> {
1273    /// let ctx = SessionContext::new();
1274    /// let left = ctx
1275    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1276    ///     .await?;
1277    /// let right = ctx
1278    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1279    ///     .await?
1280    ///     .select(vec![
1281    ///         col("a").alias("a2"),
1282    ///         col("b").alias("b2"),
1283    ///         col("c").alias("c2"),
1284    ///     ])?;
1285    ///
1286    /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
1287    /// // finding all pairs of rows from `left` and `right` where
1288    /// // where `a != a2` and `b != b2`.
1289    /// let join_on = left.join_on(
1290    ///     right,
1291    ///     JoinType::Inner,
1292    ///     [col("a").not_eq(col("a2")), col("b").not_eq(col("b2"))],
1293    /// )?;
1294    /// let expected = vec![
1295    ///     "+---+---+---+----+----+----+",
1296    ///     "| a | b | c | a2 | b2 | c2 |",
1297    ///     "+---+---+---+----+----+----+",
1298    ///     "+---+---+---+----+----+----+",
1299    /// ];
1300    /// # assert_batches_sorted_eq!(expected, &join_on.collect().await?);
1301    /// # Ok(())
1302    /// # }
1303    /// ```
1304    pub fn join_on(
1305        self,
1306        right: DataFrame,
1307        join_type: JoinType,
1308        on_exprs: impl IntoIterator<Item = Expr>,
1309    ) -> Result<DataFrame> {
1310        let plan = LogicalPlanBuilder::from(self.plan)
1311            .join_on(right.plan, join_type, on_exprs)?
1312            .build()?;
1313        Ok(DataFrame {
1314            session_state: self.session_state,
1315            plan,
1316            projection_requires_validation: true,
1317        })
1318    }
1319
1320    /// Repartition a DataFrame based on a logical partitioning scheme.
1321    ///
1322    /// # Example
1323    /// ```
1324    /// # use datafusion::prelude::*;
1325    /// # use datafusion::error::Result;
1326    /// # use datafusion_common::assert_batches_sorted_eq;
1327    /// # #[tokio::main]
1328    /// # async fn main() -> Result<()> {
1329    /// let ctx = SessionContext::new();
1330    /// let df = ctx
1331    ///     .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
1332    ///     .await?;
1333    /// let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
1334    /// let expected = vec![
1335    ///     "+---+---+---+",
1336    ///     "| a | b | c |",
1337    ///     "+---+---+---+",
1338    ///     "| 1 | 2 | 3 |",
1339    ///     "| 4 | 5 | 6 |",
1340    ///     "| 7 | 8 | 9 |",
1341    ///     "+---+---+---+",
1342    /// ];
1343    /// # assert_batches_sorted_eq!(expected, &df1.collect().await?);
1344    /// # Ok(())
1345    /// # }
1346    /// ```
1347    pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame> {
1348        let plan = LogicalPlanBuilder::from(self.plan)
1349            .repartition(partitioning_scheme)?
1350            .build()?;
1351        Ok(DataFrame {
1352            session_state: self.session_state,
1353            plan,
1354            projection_requires_validation: true,
1355        })
1356    }
1357
1358    /// Return the total number of rows in this `DataFrame`.
1359    ///
1360    /// Note that this method will actually run a plan to calculate the count,
1361    /// which may be slow for large or complicated DataFrames.
1362    ///
1363    /// # Example
1364    /// ```
1365    /// # use datafusion::prelude::*;
1366    /// # use datafusion::error::Result;
1367    /// # #[tokio::main]
1368    /// # async fn main() -> Result<()> {
1369    /// let ctx = SessionContext::new();
1370    /// let df = ctx
1371    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1372    ///     .await?;
1373    /// let count = df.count().await?; // 1
1374    /// # assert_eq!(count, 1);
1375    /// # Ok(())
1376    /// # }
1377    /// ```
1378    pub async fn count(self) -> Result<usize> {
1379        let rows = self
1380            .aggregate(
1381                vec![],
1382                vec![count(Expr::Literal(COUNT_STAR_EXPANSION, None))],
1383            )?
1384            .collect()
1385            .await?;
1386        let len = *rows
1387            .first()
1388            .and_then(|r| r.columns().first())
1389            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1390            .and_then(|a| a.values().first())
1391            .ok_or_else(|| {
1392                internal_datafusion_err!("Unexpected output when collecting for count()")
1393            })? as usize;
1394        Ok(len)
1395    }
1396
1397    /// Execute this `DataFrame` and buffer all resulting `RecordBatch`es  into memory.
1398    ///
1399    /// Prior to calling `collect`, modifying a DataFrame simply updates a plan
1400    /// (no actual computation is performed). `collect` triggers the computation.
1401    ///
1402    /// See [`Self::execute_stream`] to execute a DataFrame without buffering.
1403    ///
1404    /// # Example
1405    /// ```
1406    /// # use datafusion::prelude::*;
1407    /// # use datafusion::error::Result;
1408    /// # #[tokio::main]
1409    /// # async fn main() -> Result<()> {
1410    /// let ctx = SessionContext::new();
1411    /// let df = ctx
1412    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1413    ///     .await?;
1414    /// let batches = df.collect().await?;
1415    /// # Ok(())
1416    /// # }
1417    /// ```
1418    pub async fn collect(self) -> Result<Vec<RecordBatch>> {
1419        let task_ctx = Arc::new(self.task_ctx());
1420        let plan = self.create_physical_plan().await?;
1421        collect(plan, task_ctx).await
1422    }
1423
1424    /// Execute the `DataFrame` and print the results to the console.
1425    ///
1426    /// # Example
1427    /// ```
1428    /// # use datafusion::prelude::*;
1429    /// # use datafusion::error::Result;
1430    /// # #[tokio::main]
1431    /// # async fn main() -> Result<()> {
1432    /// let ctx = SessionContext::new();
1433    /// let df = ctx
1434    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1435    ///     .await?;
1436    /// df.show().await?;
1437    /// # Ok(())
1438    /// # }
1439    /// ```
1440    pub async fn show(self) -> Result<()> {
1441        println!("{}", self.to_string().await?);
1442        Ok(())
1443    }
1444
1445    /// Execute the `DataFrame` and return a string representation of the results.
1446    ///
1447    /// # Example
1448    /// ```
1449    /// # use datafusion::prelude::*;
1450    /// # use datafusion::error::Result;
1451    /// # use datafusion::execution::SessionStateBuilder;
1452    ///
1453    /// # #[tokio::main]
1454    /// # async fn main() -> Result<()> {
1455    /// let cfg = SessionConfig::new()
1456    ///     .set_str("datafusion.format.null", "no-value");
1457    /// let session_state = SessionStateBuilder::new()
1458    ///     .with_config(cfg)
1459    ///     .with_default_features()
1460    ///     .build();
1461    /// let ctx = SessionContext::new_with_state(session_state);
1462    /// let df = ctx.sql("select null as 'null-column'").await?;
1463    /// let result = df.to_string().await?;
1464    /// assert_eq!(result,
1465    /// "+-------------+
1466    /// | null-column |
1467    /// +-------------+
1468    /// | no-value    |
1469    /// +-------------+"
1470    /// );
1471    /// # Ok(())
1472    /// # }
1473    pub async fn to_string(self) -> Result<String> {
1474        let options = self.session_state.config().options().format.clone();
1475        let arrow_options: arrow::util::display::FormatOptions = (&options).try_into()?;
1476
1477        let results = self.collect().await?;
1478        Ok(
1479            pretty::pretty_format_batches_with_options(&results, &arrow_options)?
1480                .to_string(),
1481        )
1482    }
1483
1484    /// Execute the `DataFrame` and print only the first `num` rows of the
1485    /// result to the console.
1486    ///
1487    /// # Example
1488    /// ```
1489    /// # use datafusion::prelude::*;
1490    /// # use datafusion::error::Result;
1491    /// # #[tokio::main]
1492    /// # async fn main() -> Result<()> {
1493    /// let ctx = SessionContext::new();
1494    /// let df = ctx
1495    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1496    ///     .await?;
1497    /// df.show_limit(10).await?;
1498    /// # Ok(())
1499    /// # }
1500    /// ```
1501    pub async fn show_limit(self, num: usize) -> Result<()> {
1502        let results = self.limit(0, Some(num))?.collect().await?;
1503        Ok(pretty::print_batches(&results)?)
1504    }
1505
1506    /// Return a new [`TaskContext`] which would be used to execute this DataFrame
1507    pub fn task_ctx(&self) -> TaskContext {
1508        TaskContext::from(self.session_state.as_ref())
1509    }
1510
1511    /// Executes this DataFrame and returns a stream over a single partition
1512    ///
1513    /// See [Self::collect] to buffer the `RecordBatch`es in memory.
1514    ///
1515    /// # Example
1516    /// ```
1517    /// # use datafusion::prelude::*;
1518    /// # use datafusion::error::Result;
1519    /// # #[tokio::main]
1520    /// # async fn main() -> Result<()> {
1521    /// let ctx = SessionContext::new();
1522    /// let df = ctx
1523    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1524    ///     .await?;
1525    /// let stream = df.execute_stream().await?;
1526    /// # Ok(())
1527    /// # }
1528    /// ```
1529    ///
1530    /// # Aborting Execution
1531    ///
1532    /// Dropping the stream will abort the execution of the query, and free up
1533    /// any allocated resources
1534    pub async fn execute_stream(self) -> Result<SendableRecordBatchStream> {
1535        let task_ctx = Arc::new(self.task_ctx());
1536        let plan = self.create_physical_plan().await?;
1537        execute_stream(plan, task_ctx)
1538    }
1539
1540    /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
1541    /// maintaining the input partitioning.
1542    ///
1543    /// # Example
1544    /// ```
1545    /// # use datafusion::prelude::*;
1546    /// # use datafusion::error::Result;
1547    /// # #[tokio::main]
1548    /// # async fn main() -> Result<()> {
1549    /// let ctx = SessionContext::new();
1550    /// let df = ctx
1551    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1552    ///     .await?;
1553    /// let batches = df.collect_partitioned().await?;
1554    /// # Ok(())
1555    /// # }
1556    /// ```
1557    pub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>> {
1558        let task_ctx = Arc::new(self.task_ctx());
1559        let plan = self.create_physical_plan().await?;
1560        collect_partitioned(plan, task_ctx).await
1561    }
1562
1563    /// Executes this DataFrame and returns one stream per partition.
1564    ///
1565    /// # Example
1566    /// ```
1567    /// # use datafusion::prelude::*;
1568    /// # use datafusion::error::Result;
1569    /// # #[tokio::main]
1570    /// # async fn main() -> Result<()> {
1571    /// let ctx = SessionContext::new();
1572    /// let df = ctx
1573    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1574    ///     .await?;
1575    /// let batches = df.execute_stream_partitioned().await?;
1576    /// # Ok(())
1577    /// # }
1578    /// ```
1579    /// # Aborting Execution
1580    ///
1581    /// Dropping the stream will abort the execution of the query, and free up
1582    /// any allocated resources
1583    pub async fn execute_stream_partitioned(
1584        self,
1585    ) -> Result<Vec<SendableRecordBatchStream>> {
1586        let task_ctx = Arc::new(self.task_ctx());
1587        let plan = self.create_physical_plan().await?;
1588        execute_stream_partitioned(plan, task_ctx)
1589    }
1590
1591    /// Returns the `DFSchema` describing the output of this DataFrame.
1592    ///
1593    /// The output `DFSchema` contains information on the name, data type, and
1594    /// nullability for each column.
1595    ///
1596    /// # Example
1597    /// ```
1598    /// # use datafusion::prelude::*;
1599    /// # use datafusion::error::Result;
1600    /// # #[tokio::main]
1601    /// # async fn main() -> Result<()> {
1602    /// let ctx = SessionContext::new();
1603    /// let df = ctx
1604    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1605    ///     .await?;
1606    /// let schema = df.schema();
1607    /// # Ok(())
1608    /// # }
1609    /// ```
1610    pub fn schema(&self) -> &DFSchema {
1611        self.plan.schema()
1612    }
1613
1614    /// Return a reference to the unoptimized [`LogicalPlan`] that comprises
1615    /// this DataFrame.
1616    ///
1617    /// See [`Self::into_unoptimized_plan`] for more details.
1618    pub fn logical_plan(&self) -> &LogicalPlan {
1619        &self.plan
1620    }
1621
1622    /// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`]
1623    pub fn into_parts(self) -> (SessionState, LogicalPlan) {
1624        (*self.session_state, self.plan)
1625    }
1626
1627    /// Return the [`LogicalPlan`] represented by this DataFrame without running
1628    /// any optimizers
1629    ///
1630    /// Note: This method should not be used outside testing, as it loses the
1631    /// snapshot of the [`SessionState`] attached to this [`DataFrame`] and
1632    /// consequently subsequent operations may take place against a different
1633    /// state (e.g. a different value of `now()`)
1634    ///
1635    /// See [`Self::into_parts`] to retrieve the owned [`LogicalPlan`] and
1636    /// corresponding [`SessionState`].
1637    pub fn into_unoptimized_plan(self) -> LogicalPlan {
1638        self.plan
1639    }
1640
1641    /// Return the optimized [`LogicalPlan`] represented by this DataFrame.
1642    ///
1643    /// Note: This method should not be used outside testing -- see
1644    /// [`Self::into_unoptimized_plan`] for more details.
1645    pub fn into_optimized_plan(self) -> Result<LogicalPlan> {
1646        // Optimize the plan first for better UX
1647        self.session_state.optimize(&self.plan)
1648    }
1649
1650    /// Converts this [`DataFrame`] into a [`TableProvider`] that can be registered
1651    /// as a table view using [`SessionContext::register_table`].
1652    ///
1653    /// Note: This discards the [`SessionState`] associated with this
1654    /// [`DataFrame`] in favour of the one passed to [`TableProvider::scan`]
1655    pub fn into_view(self) -> Arc<dyn TableProvider> {
1656        Arc::new(DataFrameTableProvider {
1657            plan: self.plan,
1658            table_type: TableType::Temporary,
1659        })
1660    }
1661
1662    /// See [`Self::into_view`]. The returned [`TableProvider`] will
1663    /// create a transient table.
1664    pub fn into_temporary_view(self) -> Arc<dyn TableProvider> {
1665        Arc::new(DataFrameTableProvider {
1666            plan: self.plan,
1667            table_type: TableType::Temporary,
1668        })
1669    }
1670
1671    /// Return a DataFrame with the explanation of its plan so far.
1672    ///
1673    /// if `analyze` is specified, runs the plan and reports metrics
1674    /// if `verbose` is true, prints out additional details.
1675    /// The default format is Indent format.
1676    ///
1677    /// ```
1678    /// # use datafusion::prelude::*;
1679    /// # use datafusion::error::Result;
1680    /// # #[tokio::main]
1681    /// # async fn main() -> Result<()> {
1682    /// let ctx = SessionContext::new();
1683    /// let df = ctx
1684    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1685    ///     .await?;
1686    /// let batches = df
1687    ///     .limit(0, Some(100))?
1688    ///     .explain(false, false)?
1689    ///     .collect()
1690    ///     .await?;
1691    /// # Ok(())
1692    /// # }
1693    /// ```
1694    pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame> {
1695        // Set the default format to Indent to keep the previous behavior
1696        let opts = ExplainOption::default()
1697            .with_verbose(verbose)
1698            .with_analyze(analyze);
1699        self.explain_with_options(opts)
1700    }
1701
1702    /// Return a DataFrame with the explanation of its plan so far.
1703    ///
1704    /// `opt` is used to specify the options for the explain operation.
1705    /// Details of the options can be found in [`ExplainOption`].
1706    /// ```
1707    /// # use datafusion::prelude::*;
1708    /// # use datafusion::error::Result;
1709    /// # #[tokio::main]
1710    /// # async fn main() -> Result<()> {
1711    /// use datafusion_expr::{Explain, ExplainOption};
1712    /// let ctx = SessionContext::new();
1713    /// let df = ctx
1714    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1715    ///     .await?;
1716    /// let batches = df
1717    ///     .limit(0, Some(100))?
1718    ///     .explain_with_options(
1719    ///         ExplainOption::default()
1720    ///             .with_verbose(false)
1721    ///             .with_analyze(false),
1722    ///     )?
1723    ///     .collect()
1724    ///     .await?;
1725    /// # Ok(())
1726    /// # }
1727    /// ```
1728    pub fn explain_with_options(
1729        self,
1730        explain_option: ExplainOption,
1731    ) -> Result<DataFrame> {
1732        if matches!(self.plan, LogicalPlan::Explain(_)) {
1733            return plan_err!("Nested EXPLAINs are not supported");
1734        }
1735        let plan = LogicalPlanBuilder::from(self.plan)
1736            .explain_option_format(explain_option)?
1737            .build()?;
1738        Ok(DataFrame {
1739            session_state: self.session_state,
1740            plan,
1741            projection_requires_validation: self.projection_requires_validation,
1742        })
1743    }
1744
1745    /// Return a `FunctionRegistry` used to plan udf's calls
1746    ///
1747    /// # Example
1748    /// ```
1749    /// # use datafusion::prelude::*;
1750    /// # use datafusion::error::Result;
1751    /// # #[tokio::main]
1752    /// # async fn main() -> Result<()> {
1753    /// let ctx = SessionContext::new();
1754    /// let df = ctx
1755    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1756    ///     .await?;
1757    /// let f = df.registry();
1758    /// // use f.udf("name", vec![...]) to use the udf
1759    /// # Ok(())
1760    /// # }
1761    /// ```
1762    pub fn registry(&self) -> &dyn FunctionRegistry {
1763        self.session_state.as_ref()
1764    }
1765
1766    /// Calculate the intersection of two [`DataFrame`]s.  The two [`DataFrame`]s must have exactly the same schema
1767    ///
1768    /// ```
1769    /// # use datafusion::prelude::*;
1770    /// # use datafusion::error::Result;
1771    /// # use datafusion_common::assert_batches_sorted_eq;
1772    /// # #[tokio::main]
1773    /// # async fn main() -> Result<()> {
1774    /// let ctx = SessionContext::new();
1775    /// let df = ctx
1776    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1777    ///     .await?;
1778    /// let d2 = ctx
1779    ///     .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
1780    ///     .await?;
1781    /// let df = df.intersect(d2)?;
1782    /// let expected = vec![
1783    ///     "+---+---+---+",
1784    ///     "| a | b | c |",
1785    ///     "+---+---+---+",
1786    ///     "| 1 | 2 | 3 |",
1787    ///     "+---+---+---+",
1788    /// ];
1789    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1790    /// # Ok(())
1791    /// # }
1792    /// ```
1793    pub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame> {
1794        let left_plan = self.plan;
1795        let right_plan = dataframe.plan;
1796        let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, true)?;
1797        Ok(DataFrame {
1798            session_state: self.session_state,
1799            plan,
1800            projection_requires_validation: true,
1801        })
1802    }
1803
1804    /// Calculate the distinct intersection of two [`DataFrame`]s.  The two [`DataFrame`]s must have exactly the same schema
1805    ///
1806    /// ```
1807    /// # use datafusion::prelude::*;
1808    /// # use datafusion::error::Result;
1809    /// # use datafusion_common::assert_batches_sorted_eq;
1810    /// # #[tokio::main]
1811    /// # async fn main() -> Result<()> {
1812    /// let ctx = SessionContext::new();
1813    /// let df = ctx
1814    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1815    ///     .await?;
1816    /// let d2 = ctx
1817    ///     .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
1818    ///     .await?;
1819    /// let df = df.intersect_distinct(d2)?;
1820    /// let expected = vec![
1821    ///     "+---+---+---+",
1822    ///     "| a | b | c |",
1823    ///     "+---+---+---+",
1824    ///     "| 1 | 2 | 3 |",
1825    ///     "+---+---+---+",
1826    /// ];
1827    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1828    /// # Ok(())
1829    /// # }
1830    /// ```
1831    pub fn intersect_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
1832        let left_plan = self.plan;
1833        let right_plan = dataframe.plan;
1834        let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, false)?;
1835        Ok(DataFrame {
1836            session_state: self.session_state,
1837            plan,
1838            projection_requires_validation: true,
1839        })
1840    }
1841
1842    /// Calculate the exception of two [`DataFrame`]s.  The two [`DataFrame`]s must have exactly the same schema
1843    ///
1844    /// ```
1845    /// # use datafusion::prelude::*;
1846    /// # use datafusion::error::Result;
1847    /// # use datafusion_common::assert_batches_sorted_eq;
1848    /// # #[tokio::main]
1849    /// # async fn main() -> Result<()> {
1850    /// let ctx = SessionContext::new();
1851    /// let df = ctx
1852    ///     .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
1853    ///     .await?;
1854    /// let d2 = ctx
1855    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1856    ///     .await?;
1857    /// let result = df.except(d2)?;
1858    /// // those columns are not in example.csv, but in example_long.csv
1859    /// let expected = vec![
1860    ///     "+---+---+---+",
1861    ///     "| a | b | c |",
1862    ///     "+---+---+---+",
1863    ///     "| 4 | 5 | 6 |",
1864    ///     "| 7 | 8 | 9 |",
1865    ///     "+---+---+---+",
1866    /// ];
1867    /// # assert_batches_sorted_eq!(expected, &result.collect().await?);
1868    /// # Ok(())
1869    /// # }
1870    /// ```
1871    pub fn except(self, dataframe: DataFrame) -> Result<DataFrame> {
1872        let left_plan = self.plan;
1873        let right_plan = dataframe.plan;
1874        let plan = LogicalPlanBuilder::except(left_plan, right_plan, true)?;
1875        Ok(DataFrame {
1876            session_state: self.session_state,
1877            plan,
1878            projection_requires_validation: true,
1879        })
1880    }
1881
1882    /// Calculate the distinct exception of two [`DataFrame`]s.  The two [`DataFrame`]s must have exactly the same schema
1883    ///
1884    /// ```
1885    /// # use datafusion::prelude::*;
1886    /// # use datafusion::error::Result;
1887    /// # use datafusion_common::assert_batches_sorted_eq;
1888    /// # #[tokio::main]
1889    /// # async fn main() -> Result<()> {
1890    /// let ctx = SessionContext::new();
1891    /// let df = ctx
1892    ///     .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
1893    ///     .await?;
1894    /// let d2 = ctx
1895    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1896    ///     .await?;
1897    /// let result = df.except_distinct(d2)?;
1898    /// // those columns are not in example.csv, but in example_long.csv
1899    /// let expected = vec![
1900    ///     "+---+---+---+",
1901    ///     "| a | b | c |",
1902    ///     "+---+---+---+",
1903    ///     "| 4 | 5 | 6 |",
1904    ///     "| 7 | 8 | 9 |",
1905    ///     "+---+---+---+",
1906    /// ];
1907    /// # assert_batches_sorted_eq!(expected, &result.collect().await?);
1908    /// # Ok(())
1909    /// # }
1910    /// ```
1911    pub fn except_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
1912        let left_plan = self.plan;
1913        let right_plan = dataframe.plan;
1914        let plan = LogicalPlanBuilder::except(left_plan, right_plan, false)?;
1915        Ok(DataFrame {
1916            session_state: self.session_state,
1917            plan,
1918            projection_requires_validation: true,
1919        })
1920    }
1921
1922    /// Execute this `DataFrame` and write the results to `table_name`.
1923    ///
1924    /// Returns a single [RecordBatch] containing a single column and
1925    /// row representing the count of total rows written.
1926    ///
1927    /// Unlike most other `DataFrame` methods, this method executes eagerly.
1928    /// Data is written to the table using the [`TableProvider::insert_into`]
1929    /// method. This is the same underlying implementation used by SQL `INSERT
1930    /// INTO` statements.
1931    pub async fn write_table(
1932        self,
1933        table_name: &str,
1934        write_options: DataFrameWriteOptions,
1935    ) -> Result<Vec<RecordBatch>, DataFusionError> {
1936        let plan = if write_options.sort_by.is_empty() {
1937            self.plan
1938        } else {
1939            LogicalPlanBuilder::from(self.plan)
1940                .sort(write_options.sort_by)?
1941                .build()?
1942        };
1943
1944        let table_ref: TableReference = table_name.into();
1945        let table_schema = self.session_state.schema_for_ref(table_ref.clone())?;
1946        let target = match table_schema.table(table_ref.table()).await? {
1947            Some(ref provider) => Ok(Arc::clone(provider)),
1948            _ => plan_err!("No table named '{table_name}'"),
1949        }?;
1950
1951        let target = Arc::new(DefaultTableSource::new(target));
1952
1953        let plan = LogicalPlanBuilder::insert_into(
1954            plan,
1955            table_ref,
1956            target,
1957            write_options.insert_op,
1958        )?
1959        .build()?;
1960
1961        DataFrame {
1962            session_state: self.session_state,
1963            plan,
1964            projection_requires_validation: self.projection_requires_validation,
1965        }
1966        .collect()
1967        .await
1968    }
1969
1970    /// Execute the `DataFrame` and write the results to CSV file(s).
1971    ///
1972    /// # Example
1973    /// ```
1974    /// # use datafusion::prelude::*;
1975    /// # use datafusion::error::Result;
1976    /// # use std::fs;
1977    /// # #[tokio::main]
1978    /// # async fn main() -> Result<()> {
1979    /// use datafusion::dataframe::DataFrameWriteOptions;
1980    /// let ctx = SessionContext::new();
1981    /// // Sort the data by column "b" and write it to a new location
1982    /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new())
1983    ///     .await?
1984    ///     .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
1985    ///     .write_csv(
1986    ///         "output.csv",
1987    ///         DataFrameWriteOptions::new(),
1988    ///         None, // can also specify CSV writing options here
1989    ///     )
1990    ///     .await?;
1991    /// # fs::remove_file("output.csv")?;
1992    /// # Ok(())
1993    /// # }
1994    /// ```
1995    pub async fn write_csv(
1996        self,
1997        path: &str,
1998        options: DataFrameWriteOptions,
1999        writer_options: Option<CsvOptions>,
2000    ) -> Result<Vec<RecordBatch>, DataFusionError> {
2001        if options.insert_op != InsertOp::Append {
2002            return not_impl_err!(
2003                "{} is not implemented for DataFrame::write_csv.",
2004                options.insert_op
2005            );
2006        }
2007
2008        let format = if let Some(csv_opts) = writer_options {
2009            Arc::new(CsvFormatFactory::new_with_options(csv_opts))
2010        } else {
2011            Arc::new(CsvFormatFactory::new())
2012        };
2013
2014        let file_type = format_as_file_type(format);
2015
2016        let plan = if options.sort_by.is_empty() {
2017            self.plan
2018        } else {
2019            LogicalPlanBuilder::from(self.plan)
2020                .sort(options.sort_by)?
2021                .build()?
2022        };
2023
2024        let plan = LogicalPlanBuilder::copy_to(
2025            plan,
2026            path.into(),
2027            file_type,
2028            HashMap::new(),
2029            options.partition_by,
2030        )?
2031        .build()?;
2032
2033        DataFrame {
2034            session_state: self.session_state,
2035            plan,
2036            projection_requires_validation: self.projection_requires_validation,
2037        }
2038        .collect()
2039        .await
2040    }
2041
2042    /// Execute the `DataFrame` and write the results to JSON file(s).
2043    ///
2044    /// # Example
2045    /// ```
2046    /// # use datafusion::prelude::*;
2047    /// # use datafusion::error::Result;
2048    /// # use std::fs;
2049    /// # #[tokio::main]
2050    /// # async fn main() -> Result<()> {
2051    /// use datafusion::dataframe::DataFrameWriteOptions;
2052    /// let ctx = SessionContext::new();
2053    /// // Sort the data by column "b" and write it to a new location
2054    /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new())
2055    ///     .await?
2056    ///     .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
2057    ///     .write_json("output.json", DataFrameWriteOptions::new(), None)
2058    ///     .await?;
2059    /// # fs::remove_file("output.json")?;
2060    /// # Ok(())
2061    /// # }
2062    /// ```
2063    pub async fn write_json(
2064        self,
2065        path: &str,
2066        options: DataFrameWriteOptions,
2067        writer_options: Option<JsonOptions>,
2068    ) -> Result<Vec<RecordBatch>, DataFusionError> {
2069        if options.insert_op != InsertOp::Append {
2070            return not_impl_err!(
2071                "{} is not implemented for DataFrame::write_json.",
2072                options.insert_op
2073            );
2074        }
2075
2076        let format = if let Some(json_opts) = writer_options {
2077            Arc::new(JsonFormatFactory::new_with_options(json_opts))
2078        } else {
2079            Arc::new(JsonFormatFactory::new())
2080        };
2081
2082        let file_type = format_as_file_type(format);
2083
2084        let plan = if options.sort_by.is_empty() {
2085            self.plan
2086        } else {
2087            LogicalPlanBuilder::from(self.plan)
2088                .sort(options.sort_by)?
2089                .build()?
2090        };
2091
2092        let plan = LogicalPlanBuilder::copy_to(
2093            plan,
2094            path.into(),
2095            file_type,
2096            Default::default(),
2097            options.partition_by,
2098        )?
2099        .build()?;
2100
2101        DataFrame {
2102            session_state: self.session_state,
2103            plan,
2104            projection_requires_validation: self.projection_requires_validation,
2105        }
2106        .collect()
2107        .await
2108    }
2109
2110    /// Add or replace a column in the DataFrame.
2111    ///
2112    /// # Example
2113    /// ```
2114    /// # use datafusion::prelude::*;
2115    /// # use datafusion::error::Result;
2116    /// # #[tokio::main]
2117    /// # async fn main() -> Result<()> {
2118    /// let ctx = SessionContext::new();
2119    /// let df = ctx
2120    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
2121    ///     .await?;
2122    /// let df = df.with_column("ab_sum", col("a") + col("b"))?;
2123    /// # Ok(())
2124    /// # }
2125    /// ```
2126    pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame> {
2127        let window_func_exprs = find_window_exprs([&expr]);
2128
2129        let original_names: HashSet<String> = self
2130            .plan
2131            .schema()
2132            .iter()
2133            .map(|(_, f)| f.name().clone())
2134            .collect();
2135
2136        // Maybe build window plan
2137        let plan = if window_func_exprs.is_empty() {
2138            self.plan
2139        } else {
2140            LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
2141        };
2142
2143        let new_column = expr.alias(name);
2144        let mut col_exists = false;
2145
2146        let mut fields: Vec<(Expr, bool)> = plan
2147            .schema()
2148            .iter()
2149            .filter_map(|(qualifier, field)| {
2150                // Skip new fields introduced by window_plan
2151                if !original_names.contains(field.name()) {
2152                    return None;
2153                }
2154
2155                if field.name() == name {
2156                    col_exists = true;
2157                    Some((new_column.clone(), true))
2158                } else {
2159                    let e = col(Column::from((qualifier, field)));
2160                    Some((e, self.projection_requires_validation))
2161                }
2162            })
2163            .collect();
2164
2165        if !col_exists {
2166            fields.push((new_column, true));
2167        }
2168
2169        let project_plan = LogicalPlanBuilder::from(plan)
2170            .project_with_validation(fields)?
2171            .build()?;
2172
2173        Ok(DataFrame {
2174            session_state: self.session_state,
2175            plan: project_plan,
2176            projection_requires_validation: false,
2177        })
2178    }
2179
2180    /// Rename one column by applying a new projection. This is a no-op if the column to be
2181    /// renamed does not exist.
2182    ///
2183    /// The method supports case sensitive rename with wrapping column name into one of following symbols (  "  or  '  or  `  )
2184    ///
2185    /// Alternatively setting DataFusion param `datafusion.sql_parser.enable_ident_normalization` to `false` will enable
2186    /// case sensitive rename without need to wrap column name into special symbols
2187    ///
2188    /// # Example
2189    /// ```
2190    /// # use datafusion::prelude::*;
2191    /// # use datafusion::error::Result;
2192    /// # #[tokio::main]
2193    /// # async fn main() -> Result<()> {
2194    /// let ctx = SessionContext::new();
2195    /// let df = ctx
2196    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
2197    ///     .await?;
2198    /// let df = df.with_column_renamed("ab_sum", "total")?;
2199    ///
2200    /// # Ok(())
2201    /// # }
2202    /// ```
2203    pub fn with_column_renamed(
2204        self,
2205        old_name: impl Into<String>,
2206        new_name: &str,
2207    ) -> Result<DataFrame> {
2208        let ident_opts = self
2209            .session_state
2210            .config_options()
2211            .sql_parser
2212            .enable_ident_normalization;
2213        let old_column: Column = if ident_opts {
2214            Column::from_qualified_name(old_name)
2215        } else {
2216            Column::from_qualified_name_ignore_case(old_name)
2217        };
2218
2219        let (qualifier_rename, field_rename) =
2220            match self.plan.schema().qualified_field_from_column(&old_column) {
2221                Ok(qualifier_and_field) => qualifier_and_field,
2222                // no-op if field not found
2223                Err(DataFusionError::SchemaError(e, _))
2224                    if matches!(*e, SchemaError::FieldNotFound { .. }) =>
2225                {
2226                    return Ok(self);
2227                }
2228                Err(err) => return Err(err),
2229            };
2230        let projection = self
2231            .plan
2232            .schema()
2233            .iter()
2234            .map(|(qualifier, field)| {
2235                if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
2236                    (
2237                        col(Column::from((qualifier, field)))
2238                            .alias_qualified(qualifier.cloned(), new_name),
2239                        false,
2240                    )
2241                } else {
2242                    (col(Column::from((qualifier, field))), false)
2243                }
2244            })
2245            .collect::<Vec<_>>();
2246        let project_plan = LogicalPlanBuilder::from(self.plan)
2247            .project_with_validation(projection)?
2248            .build()?;
2249        Ok(DataFrame {
2250            session_state: self.session_state,
2251            plan: project_plan,
2252            projection_requires_validation: false,
2253        })
2254    }
2255
2256    /// Replace all parameters in logical plan with the specified
2257    /// values, in preparation for execution.
2258    ///
2259    /// # Example
2260    ///
2261    /// ```
2262    /// use datafusion::prelude::*;
2263    /// # use datafusion::{error::Result, assert_batches_eq};
2264    /// # #[tokio::main]
2265    /// # async fn main() -> Result<()> {
2266    /// # use datafusion_common::ScalarValue;
2267    /// let ctx = SessionContext::new();
2268    /// # ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
2269    /// let results = ctx
2270    ///   .sql("SELECT a FROM example WHERE b = $1")
2271    ///   .await?
2272    ///    // replace $1 with value 2
2273    ///   .with_param_values(vec![
2274    ///      // value at index 0 --> $1
2275    ///      ScalarValue::from(2i64)
2276    ///    ])?
2277    ///   .collect()
2278    ///   .await?;
2279    /// assert_batches_eq!(
2280    ///  &[
2281    ///    "+---+",
2282    ///    "| a |",
2283    ///    "+---+",
2284    ///    "| 1 |",
2285    ///    "+---+",
2286    ///  ],
2287    ///  &results
2288    /// );
2289    /// // Note you can also provide named parameters
2290    /// let results = ctx
2291    ///   .sql("SELECT a FROM example WHERE b = $my_param")
2292    ///   .await?
2293    ///    // replace $my_param with value 2
2294    ///    // Note you can also use a HashMap as well
2295    ///   .with_param_values(vec![
2296    ///       ("my_param", ScalarValue::from(2i64))
2297    ///    ])?
2298    ///   .collect()
2299    ///   .await?;
2300    /// assert_batches_eq!(
2301    ///  &[
2302    ///    "+---+",
2303    ///    "| a |",
2304    ///    "+---+",
2305    ///    "| 1 |",
2306    ///    "+---+",
2307    ///  ],
2308    ///  &results
2309    /// );
2310    /// # Ok(())
2311    /// # }
2312    /// ```
2313    pub fn with_param_values(self, query_values: impl Into<ParamValues>) -> Result<Self> {
2314        let plan = self.plan.with_param_values(query_values)?;
2315        Ok(DataFrame {
2316            session_state: self.session_state,
2317            plan,
2318            projection_requires_validation: self.projection_requires_validation,
2319        })
2320    }
2321
2322    /// Cache DataFrame as a memory table.
2323    ///
2324    /// ```
2325    /// # use datafusion::prelude::*;
2326    /// # use datafusion::error::Result;
2327    /// # #[tokio::main]
2328    /// # async fn main() -> Result<()> {
2329    /// let ctx = SessionContext::new();
2330    /// let df = ctx
2331    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
2332    ///     .await?;
2333    /// let df = df.cache().await?;
2334    /// # Ok(())
2335    /// # }
2336    /// ```
2337    pub async fn cache(self) -> Result<DataFrame> {
2338        let context = SessionContext::new_with_state((*self.session_state).clone());
2339        // The schema is consistent with the output
2340        let plan = self.clone().create_physical_plan().await?;
2341        let schema = plan.schema();
2342        let task_ctx = Arc::new(self.task_ctx());
2343        let partitions = collect_partitioned(plan, task_ctx).await?;
2344        let mem_table = MemTable::try_new(schema, partitions)?;
2345        context.read_table(Arc::new(mem_table))
2346    }
2347
2348    /// Apply an alias to the DataFrame.
2349    ///
2350    /// This method replaces the qualifiers of output columns with the given alias.
2351    pub fn alias(self, alias: &str) -> Result<DataFrame> {
2352        let plan = LogicalPlanBuilder::from(self.plan).alias(alias)?.build()?;
2353        Ok(DataFrame {
2354            session_state: self.session_state,
2355            plan,
2356            projection_requires_validation: self.projection_requires_validation,
2357        })
2358    }
2359
2360    /// Fill null values in specified columns with a given value
2361    /// If no columns are specified (empty vector), applies to all columns
2362    /// Only fills if the value can be cast to the column's type
2363    ///
2364    /// # Arguments
2365    /// * `value` - Value to fill nulls with
2366    /// * `columns` - List of column names to fill. If empty, fills all columns.
2367    ///
2368    /// # Example
2369    /// ```
2370    /// # use datafusion::prelude::*;
2371    /// # use datafusion::error::Result;
2372    /// # use datafusion_common::ScalarValue;
2373    /// # #[tokio::main]
2374    /// # async fn main() -> Result<()> {
2375    /// let ctx = SessionContext::new();
2376    /// let df = ctx
2377    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
2378    ///     .await?;
2379    /// // Fill nulls in only columns "a" and "c":
2380    /// let df = df.fill_null(ScalarValue::from(0), vec!["a".to_owned(), "c".to_owned()])?;
2381    /// // Fill nulls across all columns:
2382    /// let df = df.fill_null(ScalarValue::from(0), vec![])?;
2383    /// # Ok(())
2384    /// # }
2385    /// ```
2386    pub fn fill_null(
2387        &self,
2388        value: ScalarValue,
2389        columns: Vec<String>,
2390    ) -> Result<DataFrame> {
2391        let cols = if columns.is_empty() {
2392            self.logical_plan()
2393                .schema()
2394                .fields()
2395                .iter()
2396                .map(|f| f.as_ref().clone())
2397                .collect()
2398        } else {
2399            self.find_columns(&columns)?
2400        };
2401
2402        // Create projections for each column
2403        let projections = self
2404            .logical_plan()
2405            .schema()
2406            .fields()
2407            .iter()
2408            .map(|field| {
2409                if cols.contains(field) {
2410                    // Try to cast fill value to column type. If the cast fails, fallback to the original column.
2411                    match value.clone().cast_to(field.data_type()) {
2412                        Ok(fill_value) => Expr::Alias(Alias {
2413                            expr: Box::new(Expr::ScalarFunction(ScalarFunction {
2414                                func: coalesce(),
2415                                args: vec![col(field.name()), lit(fill_value)],
2416                            })),
2417                            relation: None,
2418                            name: field.name().to_string(),
2419                            metadata: None,
2420                        }),
2421                        Err(_) => col(field.name()),
2422                    }
2423                } else {
2424                    col(field.name())
2425                }
2426            })
2427            .collect::<Vec<_>>();
2428
2429        self.clone().select(projections)
2430    }
2431
2432    // Helper to find columns from names
2433    fn find_columns(&self, names: &[String]) -> Result<Vec<Field>> {
2434        let schema = self.logical_plan().schema();
2435        names
2436            .iter()
2437            .map(|name| {
2438                schema
2439                    .field_with_name(None, name)
2440                    .cloned()
2441                    .map_err(|_| plan_datafusion_err!("Column '{}' not found", name))
2442            })
2443            .collect()
2444    }
2445
2446    /// Helper for creating DataFrame.
2447    /// # Example
2448    /// ```
2449    /// use arrow::array::{ArrayRef, Int32Array, StringArray};
2450    /// use datafusion::prelude::DataFrame;
2451    /// use std::sync::Arc;
2452    /// let id: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
2453    /// let name: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
2454    /// let df = DataFrame::from_columns(vec![("id", id), ("name", name)]).unwrap();
2455    /// // +----+------+,
2456    /// // | id | name |,
2457    /// // +----+------+,
2458    /// // | 1  | foo  |,
2459    /// // | 2  | bar  |,
2460    /// // | 3  | baz  |,
2461    /// // +----+------+,
2462    /// ```
2463    pub fn from_columns(columns: Vec<(&str, ArrayRef)>) -> Result<Self> {
2464        let fields = columns
2465            .iter()
2466            .map(|(name, array)| Field::new(*name, array.data_type().clone(), true))
2467            .collect::<Vec<_>>();
2468
2469        let arrays = columns
2470            .into_iter()
2471            .map(|(_, array)| array)
2472            .collect::<Vec<_>>();
2473
2474        let schema = Arc::new(Schema::new(fields));
2475        let batch = RecordBatch::try_new(schema, arrays)?;
2476        let ctx = SessionContext::new();
2477        let df = ctx.read_batch(batch)?;
2478        Ok(df)
2479    }
2480}
2481
2482/// Macro for creating DataFrame.
2483/// # Example
2484/// ```
2485/// use datafusion::prelude::dataframe;
2486/// # use datafusion::error::Result;
2487/// # #[tokio::main]
2488/// # async fn main() -> Result<()> {
2489/// let df = dataframe!(
2490///    "id" => [1, 2, 3],
2491///    "name" => ["foo", "bar", "baz"]
2492///  )?;
2493/// df.show().await?;
2494/// // +----+------+,
2495/// // | id | name |,
2496/// // +----+------+,
2497/// // | 1  | foo  |,
2498/// // | 2  | bar  |,
2499/// // | 3  | baz  |,
2500/// // +----+------+,
2501/// let df_empty = dataframe!()?; // empty DataFrame
2502/// assert_eq!(df_empty.schema().fields().len(), 0);
2503/// assert_eq!(df_empty.count().await?, 0);
2504/// # Ok(())
2505/// # }
2506/// ```
2507#[macro_export]
2508macro_rules! dataframe {
2509    () => {{
2510        use std::sync::Arc;
2511
2512        use datafusion::prelude::SessionContext;
2513        use datafusion::arrow::array::RecordBatch;
2514        use datafusion::arrow::datatypes::Schema;
2515
2516        let ctx = SessionContext::new();
2517        let batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
2518        ctx.read_batch(batch)
2519    }};
2520
2521    ($($name:expr => $data:expr),+ $(,)?) => {{
2522        use datafusion::prelude::DataFrame;
2523        use datafusion::common::test_util::IntoArrayRef;
2524
2525        let columns = vec![
2526            $(
2527                ($name, $data.into_array_ref()),
2528            )+
2529        ];
2530
2531        DataFrame::from_columns(columns)
2532    }};
2533}
2534
2535#[derive(Debug)]
2536struct DataFrameTableProvider {
2537    plan: LogicalPlan,
2538    table_type: TableType,
2539}
2540
2541#[async_trait]
2542impl TableProvider for DataFrameTableProvider {
2543    fn as_any(&self) -> &dyn Any {
2544        self
2545    }
2546
2547    fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> {
2548        Some(Cow::Borrowed(&self.plan))
2549    }
2550
2551    fn supports_filters_pushdown(
2552        &self,
2553        filters: &[&Expr],
2554    ) -> Result<Vec<TableProviderFilterPushDown>> {
2555        // A filter is added on the DataFrame when given
2556        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2557    }
2558
2559    fn schema(&self) -> SchemaRef {
2560        Arc::clone(self.plan.schema().inner())
2561    }
2562
2563    fn table_type(&self) -> TableType {
2564        self.table_type
2565    }
2566
2567    async fn scan(
2568        &self,
2569        state: &dyn Session,
2570        projection: Option<&Vec<usize>>,
2571        filters: &[Expr],
2572        limit: Option<usize>,
2573    ) -> Result<Arc<dyn ExecutionPlan>> {
2574        let mut expr = LogicalPlanBuilder::from(self.plan.clone());
2575        // Add filter when given
2576        let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
2577        if let Some(filter) = filter {
2578            expr = expr.filter(filter)?
2579        }
2580
2581        if let Some(p) = projection {
2582            expr = expr.select(p.iter().copied())?
2583        }
2584
2585        // add a limit if given
2586        if let Some(l) = limit {
2587            expr = expr.limit(0, Some(l))?
2588        }
2589        let plan = expr.build()?;
2590        state.create_physical_plan(&plan).await
2591    }
2592}
2593
2594// see tests in datafusion/core/tests/dataframe/mod.rs:2816