datafusion/dataframe/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataFrame`] API for building and executing query plans.
19
20#[cfg(feature = "parquet")]
21mod parquet;
22
23use crate::arrow::record_batch::RecordBatch;
24use crate::arrow::util::pretty;
25use crate::datasource::file_format::csv::CsvFormatFactory;
26use crate::datasource::file_format::format_as_file_type;
27use crate::datasource::file_format::json::JsonFormatFactory;
28use crate::datasource::{
29 provider_as_source, DefaultTableSource, MemTable, TableProvider,
30};
31use crate::error::Result;
32use crate::execution::context::{SessionState, TaskContext};
33use crate::execution::FunctionRegistry;
34use crate::logical_expr::utils::find_window_exprs;
35use crate::logical_expr::{
36 col, ident, Expr, JoinType, LogicalPlan, LogicalPlanBuilder,
37 LogicalPlanBuilderOptions, Partitioning, TableType,
38};
39use crate::physical_plan::{
40 collect, collect_partitioned, execute_stream, execute_stream_partitioned,
41 ExecutionPlan, SendableRecordBatchStream,
42};
43use crate::prelude::SessionContext;
44use std::any::Any;
45use std::borrow::Cow;
46use std::collections::{HashMap, HashSet};
47use std::sync::Arc;
48
49use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
50use arrow::compute::{cast, concat};
51use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
52use datafusion_common::config::{CsvOptions, JsonOptions};
53use datafusion_common::{
54 exec_err, internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err,
55 Column, DFSchema, DataFusionError, ParamValues, ScalarValue, SchemaError,
56 TableReference, UnnestOptions,
57};
58use datafusion_expr::select_expr::SelectExpr;
59use datafusion_expr::{
60 case,
61 dml::InsertOp,
62 expr::{Alias, ScalarFunction},
63 is_null, lit,
64 utils::COUNT_STAR_EXPANSION,
65 ExplainOption, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE,
66};
67use datafusion_functions::core::coalesce;
68use datafusion_functions_aggregate::expr_fn::{
69 avg, count, max, median, min, stddev, sum,
70};
71
72use async_trait::async_trait;
73use datafusion_catalog::Session;
74
75/// Contains options that control how data is
76/// written out from a DataFrame
77pub struct DataFrameWriteOptions {
78 /// Controls how new data should be written to the table, determining whether
79 /// to append, overwrite, or replace existing data.
80 insert_op: InsertOp,
81 /// Controls if all partitions should be coalesced into a single output file
82 /// Generally will have slower performance when set to true.
83 single_file_output: bool,
84 /// Sets which columns should be used for hive-style partitioned writes by name.
85 /// Can be set to empty vec![] for non-partitioned writes.
86 partition_by: Vec<String>,
87 /// Sets which columns should be used for sorting the output by name.
88 /// Can be set to empty vec![] for non-sorted writes.
89 sort_by: Vec<SortExpr>,
90}
91
92impl DataFrameWriteOptions {
93 /// Create a new DataFrameWriteOptions with default values
94 pub fn new() -> Self {
95 DataFrameWriteOptions {
96 insert_op: InsertOp::Append,
97 single_file_output: false,
98 partition_by: vec![],
99 sort_by: vec![],
100 }
101 }
102
103 /// Set the insert operation
104 pub fn with_insert_operation(mut self, insert_op: InsertOp) -> Self {
105 self.insert_op = insert_op;
106 self
107 }
108
109 /// Set the single_file_output value to true or false
110 pub fn with_single_file_output(mut self, single_file_output: bool) -> Self {
111 self.single_file_output = single_file_output;
112 self
113 }
114
115 /// Sets the partition_by columns for output partitioning
116 pub fn with_partition_by(mut self, partition_by: Vec<String>) -> Self {
117 self.partition_by = partition_by;
118 self
119 }
120
121 /// Sets the sort_by columns for output sorting
122 pub fn with_sort_by(mut self, sort_by: Vec<SortExpr>) -> Self {
123 self.sort_by = sort_by;
124 self
125 }
126}
127
128impl Default for DataFrameWriteOptions {
129 fn default() -> Self {
130 Self::new()
131 }
132}
133
134/// Represents a logical set of rows with the same named columns.
135///
136/// Similar to a [Pandas DataFrame] or [Spark DataFrame], a DataFusion DataFrame
137/// represents a 2 dimensional table of rows and columns.
138///
139/// The typical workflow using DataFrames looks like
140///
141/// 1. Create a DataFrame via methods on [SessionContext], such as [`read_csv`]
142/// and [`read_parquet`].
143///
144/// 2. Build a desired calculation by calling methods such as [`filter`],
145/// [`select`], [`aggregate`], and [`limit`]
146///
147/// 3. Execute into [`RecordBatch`]es by calling [`collect`]
148///
149/// A `DataFrame` is a wrapper around a [`LogicalPlan`] and the [`SessionState`]
150/// required for execution.
151///
152/// DataFrames are "lazy" in the sense that most methods do not actually compute
153/// anything, they just build up a plan. Calling [`collect`] executes the plan
154/// using the same DataFusion planning and execution process used to execute SQL
155/// and other queries.
156///
157/// [Pandas DataFrame]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
158/// [Spark DataFrame]: https://spark.apache.org/docs/latest/sql-programming-guide.html
159/// [`read_csv`]: SessionContext::read_csv
160/// [`read_parquet`]: SessionContext::read_parquet
161/// [`filter`]: DataFrame::filter
162/// [`select`]: DataFrame::select
163/// [`aggregate`]: DataFrame::aggregate
164/// [`limit`]: DataFrame::limit
165/// [`collect`]: DataFrame::collect
166///
167/// # Example
168/// ```
169/// # use std::sync::Arc;
170/// # use datafusion::prelude::*;
171/// # use datafusion::error::Result;
172/// # use datafusion::functions_aggregate::expr_fn::min;
173/// # use datafusion::arrow::array::{Int32Array, RecordBatch, StringArray};
174/// # use datafusion::arrow::datatypes::{DataType, Field, Schema};
175/// # #[tokio::main]
176/// # async fn main() -> Result<()> {
177/// let ctx = SessionContext::new();
178/// // Read the data from a csv file
179/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
180/// // create a new dataframe that computes the equivalent of
181/// // `SELECT a, MIN(b) FROM df WHERE a <= b GROUP BY a LIMIT 100;`
182/// let df = df.filter(col("a").lt_eq(col("b")))?
183/// .aggregate(vec![col("a")], vec![min(col("b"))])?
184/// .limit(0, Some(100))?;
185/// // Perform the actual computation
186/// let results = df.collect();
187///
188/// // Create a new dataframe with in-memory data
189/// let schema = Schema::new(vec![
190/// Field::new("id", DataType::Int32, true),
191/// Field::new("name", DataType::Utf8, true),
192/// ]);
193/// let batch = RecordBatch::try_new(
194/// Arc::new(schema),
195/// vec![
196/// Arc::new(Int32Array::from(vec![1, 2, 3])),
197/// Arc::new(StringArray::from(vec!["foo", "bar", "baz"])),
198/// ],
199/// )?;
200/// let df = ctx.read_batch(batch)?;
201/// df.show().await?;
202///
203/// // Create a new dataframe with in-memory data using macro
204/// let df = dataframe!(
205/// "id" => [1, 2, 3],
206/// "name" => ["foo", "bar", "baz"]
207/// )?;
208/// df.show().await?;
209/// # Ok(())
210/// # }
211/// ```
212#[derive(Debug, Clone)]
213pub struct DataFrame {
214 // Box the (large) SessionState to reduce the size of DataFrame on the stack
215 session_state: Box<SessionState>,
216 plan: LogicalPlan,
217 // Whether projection ops can skip validation or not. This flag if false
218 // allows for an optimization in `with_column` and `with_column_renamed` functions
219 // where the recursive work required to columnize and normalize expressions can
220 // be skipped if set to false. Since these function calls are often chained or
221 // called many times in dataframe operations this can result in a significant
222 // performance gain.
223 //
224 // The conditions where this can be set to false is when the dataframe function
225 // call results in the last operation being a
226 // `LogicalPlanBuilder::from(plan).project(fields)?.build()` or
227 // `LogicalPlanBuilder::from(plan).project_with_validation(fields)?.build()`
228 // call. This requirement guarantees that the plan has had all columnization
229 // and normalization applied to existing expressions and only new expressions
230 // will require that work. Any operation that update the plan in any way
231 // via anything other than a `project` call should set this to true.
232 projection_requires_validation: bool,
233}
234
235impl DataFrame {
236 /// Create a new `DataFrame ` based on an existing `LogicalPlan`
237 ///
238 /// This is a low-level method and is not typically used by end users. See
239 /// [`SessionContext::read_csv`] and other methods for creating a
240 /// `DataFrame` from an existing datasource.
241 pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self {
242 Self {
243 session_state: Box::new(session_state),
244 plan,
245 projection_requires_validation: true,
246 }
247 }
248
249 /// Creates logical expression from a SQL query text.
250 /// The expression is created and processed against the current schema.
251 ///
252 /// # Example: Parsing SQL queries
253 /// ```
254 /// # use arrow::datatypes::{DataType, Field, Schema};
255 /// # use datafusion::prelude::*;
256 /// # use datafusion_common::{DFSchema, Result};
257 /// # #[tokio::main]
258 /// # async fn main() -> Result<()> {
259 /// // datafusion will parse number as i64 first.
260 /// let sql = "a > 1 and b in (1, 10)";
261 /// let expected = col("a")
262 /// .gt(lit(1 as i64))
263 /// .and(col("b").in_list(vec![lit(1 as i64), lit(10 as i64)], false));
264 /// let ctx = SessionContext::new();
265 /// let df = ctx
266 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
267 /// .await?;
268 /// let expr = df.parse_sql_expr(sql)?;
269 /// assert_eq!(expected, expr);
270 /// # Ok(())
271 /// # }
272 /// ```
273 #[cfg(feature = "sql")]
274 pub fn parse_sql_expr(&self, sql: &str) -> Result<Expr> {
275 let df_schema = self.schema();
276
277 self.session_state.create_logical_expr(sql, df_schema)
278 }
279
280 /// Consume the DataFrame and produce a physical plan
281 pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>> {
282 self.session_state.create_physical_plan(&self.plan).await
283 }
284
285 /// Filter the DataFrame by column. Returns a new DataFrame only containing the
286 /// specified columns.
287 ///
288 /// ```
289 /// # use datafusion::prelude::*;
290 /// # use datafusion::error::Result;
291 /// # use datafusion_common::assert_batches_sorted_eq;
292 /// # #[tokio::main]
293 /// # async fn main() -> Result<()> {
294 /// let ctx = SessionContext::new();
295 /// let df = ctx
296 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
297 /// .await?;
298 /// let df = df.select_columns(&["a", "b"])?;
299 /// let expected = vec![
300 /// "+---+---+",
301 /// "| a | b |",
302 /// "+---+---+",
303 /// "| 1 | 2 |",
304 /// "+---+---+",
305 /// ];
306 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
307 /// # Ok(())
308 /// # }
309 /// ```
310 pub fn select_columns(self, columns: &[&str]) -> Result<DataFrame> {
311 let fields = columns
312 .iter()
313 .flat_map(|name| {
314 self.plan
315 .schema()
316 .qualified_fields_with_unqualified_name(name)
317 })
318 .collect::<Vec<_>>();
319 let expr: Vec<Expr> = fields
320 .into_iter()
321 .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
322 .collect();
323 self.select(expr)
324 }
325 /// Project arbitrary list of expression strings into a new `DataFrame`.
326 /// Method will parse string expressions into logical plan expressions.
327 ///
328 /// The output `DataFrame` has one column for each element in `exprs`.
329 ///
330 /// # Example
331 /// ```
332 /// # use datafusion::prelude::*;
333 /// # use datafusion::error::Result;
334 /// # #[tokio::main]
335 /// # async fn main() -> Result<()> {
336 /// let ctx = SessionContext::new();
337 /// let df = ctx
338 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
339 /// .await?;
340 /// let df: DataFrame = df.select_exprs(&["a * b", "c"])?;
341 /// # Ok(())
342 /// # }
343 /// ```
344 #[cfg(feature = "sql")]
345 pub fn select_exprs(self, exprs: &[&str]) -> Result<DataFrame> {
346 let expr_list = exprs
347 .iter()
348 .map(|e| self.parse_sql_expr(e))
349 .collect::<Result<Vec<_>>>()?;
350
351 self.select(expr_list)
352 }
353
354 /// Project arbitrary expressions (like SQL SELECT expressions) into a new
355 /// `DataFrame`.
356 ///
357 /// The output `DataFrame` has one column for each element in `expr_list`.
358 ///
359 /// # Example
360 /// ```
361 /// # use datafusion::prelude::*;
362 /// # use datafusion::error::Result;
363 /// # use datafusion_common::assert_batches_sorted_eq;
364 /// # #[tokio::main]
365 /// # async fn main() -> Result<()> {
366 /// let ctx = SessionContext::new();
367 /// let df = ctx
368 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
369 /// .await?;
370 /// let df = df.select(vec![col("a"), col("b") * col("c")])?;
371 /// let expected = vec![
372 /// "+---+-----------------------+",
373 /// "| a | ?table?.b * ?table?.c |",
374 /// "+---+-----------------------+",
375 /// "| 1 | 6 |",
376 /// "+---+-----------------------+",
377 /// ];
378 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
379 /// # Ok(())
380 /// # }
381 /// ```
382 pub fn select(
383 self,
384 expr_list: impl IntoIterator<Item = impl Into<SelectExpr>>,
385 ) -> Result<DataFrame> {
386 let expr_list: Vec<SelectExpr> =
387 expr_list.into_iter().map(|e| e.into()).collect::<Vec<_>>();
388
389 let expressions = expr_list.iter().filter_map(|e| match e {
390 SelectExpr::Expression(expr) => Some(expr),
391 _ => None,
392 });
393
394 let window_func_exprs = find_window_exprs(expressions);
395 let plan = if window_func_exprs.is_empty() {
396 self.plan
397 } else {
398 LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
399 };
400
401 let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
402
403 Ok(DataFrame {
404 session_state: self.session_state,
405 plan: project_plan,
406 projection_requires_validation: false,
407 })
408 }
409
410 /// Returns a new DataFrame containing all columns except the specified columns.
411 ///
412 /// ```
413 /// # use datafusion::prelude::*;
414 /// # use datafusion::error::Result;
415 /// # use datafusion_common::assert_batches_sorted_eq;
416 /// # #[tokio::main]
417 /// # async fn main() -> Result<()> {
418 /// let ctx = SessionContext::new();
419 /// let df = ctx
420 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
421 /// .await?;
422 /// // +----+----+----+
423 /// // | a | b | c |
424 /// // +----+----+----+
425 /// // | 1 | 2 | 3 |
426 /// // +----+----+----+
427 /// let df = df.drop_columns(&["a"])?;
428 /// let expected = vec![
429 /// "+---+---+",
430 /// "| b | c |",
431 /// "+---+---+",
432 /// "| 2 | 3 |",
433 /// "+---+---+",
434 /// ];
435 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
436 /// # Ok(())
437 /// # }
438 /// ```
439 pub fn drop_columns(self, columns: &[&str]) -> Result<DataFrame> {
440 let fields_to_drop = columns
441 .iter()
442 .flat_map(|name| {
443 self.plan
444 .schema()
445 .qualified_fields_with_unqualified_name(name)
446 })
447 .collect::<Vec<_>>();
448 let expr: Vec<Expr> = self
449 .plan
450 .schema()
451 .fields()
452 .into_iter()
453 .enumerate()
454 .map(|(idx, _)| self.plan.schema().qualified_field(idx))
455 .filter(|(qualifier, f)| !fields_to_drop.contains(&(*qualifier, f)))
456 .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
457 .collect();
458 self.select(expr)
459 }
460
461 /// Expand multiple list/struct columns into a set of rows and new columns.
462 ///
463 /// See also: [`UnnestOptions`] documentation for the behavior of `unnest`
464 ///
465 /// # Example
466 /// ```
467 /// # use datafusion::prelude::*;
468 /// # use datafusion::error::Result;
469 /// # use datafusion_common::assert_batches_sorted_eq;
470 /// # #[tokio::main]
471 /// # async fn main() -> Result<()> {
472 /// let ctx = SessionContext::new();
473 /// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?;
474 /// // expand into multiple columns if it's json array, flatten field name if it's nested structure
475 /// let df = df.unnest_columns(&["b","c","d"])?;
476 /// let expected = vec![
477 /// "+---+------+-------+-----+-----+",
478 /// "| a | b | c | d.e | d.f |",
479 /// "+---+------+-------+-----+-----+",
480 /// "| 1 | 2.0 | false | 1 | 2 |",
481 /// "| 1 | 1.3 | true | 1 | 2 |",
482 /// "| 1 | -6.1 | | 1 | 2 |",
483 /// "| 2 | 3.0 | false | | |",
484 /// "| 2 | 2.3 | true | | |",
485 /// "| 2 | -7.1 | | | |",
486 /// "+---+------+-------+-----+-----+"
487 /// ];
488 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
489 /// # Ok(())
490 /// # }
491 /// ```
492 pub fn unnest_columns(self, columns: &[&str]) -> Result<DataFrame> {
493 self.unnest_columns_with_options(columns, UnnestOptions::new())
494 }
495
496 /// Expand multiple list columns into a set of rows, with
497 /// behavior controlled by [`UnnestOptions`].
498 ///
499 /// Please see the documentation on [`UnnestOptions`] for more
500 /// details about the meaning of unnest.
501 pub fn unnest_columns_with_options(
502 self,
503 columns: &[&str],
504 options: UnnestOptions,
505 ) -> Result<DataFrame> {
506 let columns = columns.iter().map(|c| Column::from(*c)).collect();
507 let plan = LogicalPlanBuilder::from(self.plan)
508 .unnest_columns_with_options(columns, options)?
509 .build()?;
510 Ok(DataFrame {
511 session_state: self.session_state,
512 plan,
513 projection_requires_validation: true,
514 })
515 }
516
517 /// Return a DataFrame with only rows for which `predicate` evaluates to
518 /// `true`.
519 ///
520 /// Rows for which `predicate` evaluates to `false` or `null`
521 /// are filtered out.
522 ///
523 /// # Example
524 /// ```
525 /// # use datafusion::prelude::*;
526 /// # use datafusion::error::Result;
527 /// # use datafusion_common::assert_batches_sorted_eq;
528 /// # #[tokio::main]
529 /// # async fn main() -> Result<()> {
530 /// let ctx = SessionContext::new();
531 /// let df = ctx
532 /// .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
533 /// .await?;
534 /// let df = df.filter(col("a").lt_eq(col("b")))?;
535 /// // all rows where a <= b are returned
536 /// let expected = vec![
537 /// "+---+---+---+",
538 /// "| a | b | c |",
539 /// "+---+---+---+",
540 /// "| 1 | 2 | 3 |",
541 /// "| 4 | 5 | 6 |",
542 /// "| 7 | 8 | 9 |",
543 /// "+---+---+---+",
544 /// ];
545 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
546 /// # Ok(())
547 /// # }
548 /// ```
549 pub fn filter(self, predicate: Expr) -> Result<DataFrame> {
550 let plan = LogicalPlanBuilder::from(self.plan)
551 .filter(predicate)?
552 .build()?;
553 Ok(DataFrame {
554 session_state: self.session_state,
555 plan,
556 projection_requires_validation: true,
557 })
558 }
559
560 /// Return a new `DataFrame` that aggregates the rows of the current
561 /// `DataFrame`, first optionally grouping by the given expressions.
562 ///
563 /// # Example
564 /// ```
565 /// # use datafusion::prelude::*;
566 /// # use datafusion::error::Result;
567 /// # use datafusion::functions_aggregate::expr_fn::min;
568 /// # use datafusion_common::assert_batches_sorted_eq;
569 /// # #[tokio::main]
570 /// # async fn main() -> Result<()> {
571 /// let ctx = SessionContext::new();
572 /// let df = ctx
573 /// .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
574 /// .await?;
575 ///
576 /// // The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
577 /// let df1 = df.clone().aggregate(vec![col("a")], vec![min(col("b"))])?;
578 /// let expected1 = vec![
579 /// "+---+----------------+",
580 /// "| a | min(?table?.b) |",
581 /// "+---+----------------+",
582 /// "| 1 | 2 |",
583 /// "| 4 | 5 |",
584 /// "| 7 | 8 |",
585 /// "+---+----------------+",
586 /// ];
587 /// assert_batches_sorted_eq!(expected1, &df1.collect().await?);
588 /// // The following use is the equivalent of "SELECT MIN(b)"
589 /// let df2 = df.aggregate(vec![], vec![min(col("b"))])?;
590 /// let expected2 = vec![
591 /// "+----------------+",
592 /// "| min(?table?.b) |",
593 /// "+----------------+",
594 /// "| 2 |",
595 /// "+----------------+",
596 /// ];
597 /// # assert_batches_sorted_eq!(expected2, &df2.collect().await?);
598 /// # Ok(())
599 /// # }
600 /// ```
601 pub fn aggregate(
602 self,
603 group_expr: Vec<Expr>,
604 aggr_expr: Vec<Expr>,
605 ) -> Result<DataFrame> {
606 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
607 let aggr_expr_len = aggr_expr.len();
608 let options =
609 LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
610 let plan = LogicalPlanBuilder::from(self.plan)
611 .with_options(options)
612 .aggregate(group_expr, aggr_expr)?
613 .build()?;
614 let plan = if is_grouping_set {
615 let grouping_id_pos = plan.schema().fields().len() - 1 - aggr_expr_len;
616 // For grouping sets we do a project to not expose the internal grouping id
617 let exprs = plan
618 .schema()
619 .columns()
620 .into_iter()
621 .enumerate()
622 .filter(|(idx, _)| *idx != grouping_id_pos)
623 .map(|(_, column)| Expr::Column(column))
624 .collect::<Vec<_>>();
625 LogicalPlanBuilder::from(plan).project(exprs)?.build()?
626 } else {
627 plan
628 };
629 Ok(DataFrame {
630 session_state: self.session_state,
631 plan,
632 projection_requires_validation: !is_grouping_set,
633 })
634 }
635
636 /// Return a new DataFrame that adds the result of evaluating one or more
637 /// window functions ([`Expr::WindowFunction`]) to the existing columns
638 pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame> {
639 let plan = LogicalPlanBuilder::from(self.plan)
640 .window(window_exprs)?
641 .build()?;
642 Ok(DataFrame {
643 session_state: self.session_state,
644 plan,
645 projection_requires_validation: true,
646 })
647 }
648
649 /// Returns a new `DataFrame` with a limited number of rows.
650 ///
651 /// # Arguments
652 /// `skip` - Number of rows to skip before fetch any row
653 /// `fetch` - Maximum number of rows to return, after skipping `skip` rows.
654 ///
655 /// # Example
656 /// ```
657 /// # use datafusion::prelude::*;
658 /// # use datafusion::error::Result;
659 /// # use datafusion_common::assert_batches_sorted_eq;
660 /// # #[tokio::main]
661 /// # async fn main() -> Result<()> {
662 /// let ctx = SessionContext::new();
663 /// let df = ctx
664 /// .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
665 /// .await?;
666 /// let df = df.limit(1, Some(2))?;
667 /// let expected = vec![
668 /// "+---+---+---+",
669 /// "| a | b | c |",
670 /// "+---+---+---+",
671 /// "| 4 | 5 | 6 |",
672 /// "| 7 | 8 | 9 |",
673 /// "+---+---+---+",
674 /// ];
675 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
676 /// # Ok(())
677 /// # }
678 /// ```
679 pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame> {
680 let plan = LogicalPlanBuilder::from(self.plan)
681 .limit(skip, fetch)?
682 .build()?;
683 Ok(DataFrame {
684 session_state: self.session_state,
685 plan,
686 projection_requires_validation: self.projection_requires_validation,
687 })
688 }
689
690 /// Calculate the union of two [`DataFrame`]s, preserving duplicate rows.
691 ///
692 /// The two [`DataFrame`]s must have exactly the same schema
693 ///
694 /// # Example
695 /// ```
696 /// # use datafusion::prelude::*;
697 /// # use datafusion::error::Result;
698 /// # use datafusion_common::assert_batches_sorted_eq;
699 /// # #[tokio::main]
700 /// # async fn main() -> Result<()> {
701 /// let ctx = SessionContext::new();
702 /// let df = ctx
703 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
704 /// .await?;
705 /// let d2 = df.clone();
706 /// let df = df.union(d2)?;
707 /// let expected = vec![
708 /// "+---+---+---+",
709 /// "| a | b | c |",
710 /// "+---+---+---+",
711 /// "| 1 | 2 | 3 |",
712 /// "| 1 | 2 | 3 |",
713 /// "+---+---+---+",
714 /// ];
715 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
716 /// # Ok(())
717 /// # }
718 /// ```
719 pub fn union(self, dataframe: DataFrame) -> Result<DataFrame> {
720 let plan = LogicalPlanBuilder::from(self.plan)
721 .union(dataframe.plan)?
722 .build()?;
723 Ok(DataFrame {
724 session_state: self.session_state,
725 plan,
726 projection_requires_validation: true,
727 })
728 }
729
730 /// Calculate the union of two [`DataFrame`]s using column names, preserving duplicate rows.
731 ///
732 /// The two [`DataFrame`]s are combined using column names rather than position,
733 /// filling missing columns with null.
734 ///
735 ///
736 /// # Example
737 /// ```
738 /// # use datafusion::prelude::*;
739 /// # use datafusion::error::Result;
740 /// # use datafusion_common::assert_batches_sorted_eq;
741 /// # #[tokio::main]
742 /// # async fn main() -> Result<()> {
743 /// let ctx = SessionContext::new();
744 /// let df = ctx
745 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
746 /// .await?;
747 /// let d2 = df
748 /// .clone()
749 /// .select_columns(&["b", "c", "a"])?
750 /// .with_column("d", lit("77"))?;
751 /// let df = df.union_by_name(d2)?;
752 /// let expected = vec![
753 /// "+---+---+---+----+",
754 /// "| a | b | c | d |",
755 /// "+---+---+---+----+",
756 /// "| 1 | 2 | 3 | |",
757 /// "| 1 | 2 | 3 | 77 |",
758 /// "+---+---+---+----+",
759 /// ];
760 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
761 /// # Ok(())
762 /// # }
763 /// ```
764 pub fn union_by_name(self, dataframe: DataFrame) -> Result<DataFrame> {
765 let plan = LogicalPlanBuilder::from(self.plan)
766 .union_by_name(dataframe.plan)?
767 .build()?;
768 Ok(DataFrame {
769 session_state: self.session_state,
770 plan,
771 projection_requires_validation: true,
772 })
773 }
774
775 /// Calculate the distinct union of two [`DataFrame`]s.
776 ///
777 /// The two [`DataFrame`]s must have exactly the same schema. Any duplicate
778 /// rows are discarded.
779 ///
780 /// # Example
781 /// ```
782 /// # use datafusion::prelude::*;
783 /// # use datafusion::error::Result;
784 /// # use datafusion_common::assert_batches_sorted_eq;
785 /// # #[tokio::main]
786 /// # async fn main() -> Result<()> {
787 /// let ctx = SessionContext::new();
788 /// let df = ctx
789 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
790 /// .await?;
791 /// let d2 = df.clone();
792 /// let df = df.union_distinct(d2)?;
793 /// // df2 are duplicate of df
794 /// let expected = vec![
795 /// "+---+---+---+",
796 /// "| a | b | c |",
797 /// "+---+---+---+",
798 /// "| 1 | 2 | 3 |",
799 /// "+---+---+---+",
800 /// ];
801 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
802 /// # Ok(())
803 /// # }
804 /// ```
805 pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
806 let plan = LogicalPlanBuilder::from(self.plan)
807 .union_distinct(dataframe.plan)?
808 .build()?;
809 Ok(DataFrame {
810 session_state: self.session_state,
811 plan,
812 projection_requires_validation: true,
813 })
814 }
815
816 /// Calculate the union of two [`DataFrame`]s using column names with all duplicated rows removed.
817 ///
818 /// The two [`DataFrame`]s are combined using column names rather than position,
819 /// filling missing columns with null.
820 ///
821 ///
822 /// # Example
823 /// ```
824 /// # use datafusion::prelude::*;
825 /// # use datafusion::error::Result;
826 /// # use datafusion_common::assert_batches_sorted_eq;
827 /// # #[tokio::main]
828 /// # async fn main() -> Result<()> {
829 /// let ctx = SessionContext::new();
830 /// let df = ctx
831 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
832 /// .await?;
833 /// let d2 = df.clone().select_columns(&["b", "c", "a"])?;
834 /// let df = df.union_by_name_distinct(d2)?;
835 /// let expected = vec![
836 /// "+---+---+---+",
837 /// "| a | b | c |",
838 /// "+---+---+---+",
839 /// "| 1 | 2 | 3 |",
840 /// "+---+---+---+",
841 /// ];
842 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
843 /// # Ok(())
844 /// # }
845 /// ```
846 pub fn union_by_name_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
847 let plan = LogicalPlanBuilder::from(self.plan)
848 .union_by_name_distinct(dataframe.plan)?
849 .build()?;
850 Ok(DataFrame {
851 session_state: self.session_state,
852 plan,
853 projection_requires_validation: true,
854 })
855 }
856
857 /// Return a new `DataFrame` with all duplicated rows removed.
858 ///
859 /// # Example
860 /// ```
861 /// # use datafusion::prelude::*;
862 /// # use datafusion::error::Result;
863 /// # use datafusion_common::assert_batches_sorted_eq;
864 /// # #[tokio::main]
865 /// # async fn main() -> Result<()> {
866 /// let ctx = SessionContext::new();
867 /// let df = ctx
868 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
869 /// .await?;
870 /// let df = df.distinct()?;
871 /// let expected = vec![
872 /// "+---+---+---+",
873 /// "| a | b | c |",
874 /// "+---+---+---+",
875 /// "| 1 | 2 | 3 |",
876 /// "+---+---+---+",
877 /// ];
878 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
879 /// # Ok(())
880 /// # }
881 /// ```
882 pub fn distinct(self) -> Result<DataFrame> {
883 let plan = LogicalPlanBuilder::from(self.plan).distinct()?.build()?;
884 Ok(DataFrame {
885 session_state: self.session_state,
886 plan,
887 projection_requires_validation: true,
888 })
889 }
890
891 /// Return a new `DataFrame` with duplicated rows removed as per the specified expression list
892 /// according to the provided sorting expressions grouped by the `DISTINCT ON` clause
893 /// expressions.
894 ///
895 /// # Example
896 /// ```
897 /// # use datafusion::prelude::*;
898 /// # use datafusion::error::Result;
899 /// # use datafusion_common::assert_batches_sorted_eq;
900 /// # #[tokio::main]
901 /// # async fn main() -> Result<()> {
902 /// let ctx = SessionContext::new();
903 /// let df = ctx
904 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
905 /// .await?
906 /// // Return a single row (a, b) for each distinct value of a
907 /// .distinct_on(vec![col("a")], vec![col("a"), col("b")], None)?;
908 /// let expected = vec![
909 /// "+---+---+",
910 /// "| a | b |",
911 /// "+---+---+",
912 /// "| 1 | 2 |",
913 /// "+---+---+",
914 /// ];
915 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
916 /// # Ok(())
917 /// # }
918 /// ```
919 pub fn distinct_on(
920 self,
921 on_expr: Vec<Expr>,
922 select_expr: Vec<Expr>,
923 sort_expr: Option<Vec<SortExpr>>,
924 ) -> Result<DataFrame> {
925 let plan = LogicalPlanBuilder::from(self.plan)
926 .distinct_on(on_expr, select_expr, sort_expr)?
927 .build()?;
928 Ok(DataFrame {
929 session_state: self.session_state,
930 plan,
931 projection_requires_validation: true,
932 })
933 }
934
935 /// Return a new `DataFrame` that has statistics for a DataFrame.
936 ///
937 /// Only summarizes numeric datatypes at the moment and returns nulls for
938 /// non numeric datatypes. The output format is modeled after pandas
939 ///
940 /// # Example
941 /// ```
942 /// # use datafusion::prelude::*;
943 /// # use datafusion::error::Result;
944 /// # use arrow::util::pretty;
945 /// # use datafusion_common::assert_batches_sorted_eq;
946 /// # #[tokio::main]
947 /// # async fn main() -> Result<()> {
948 /// let ctx = SessionContext::new();
949 /// let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?;
950 /// let stat = df.describe().await?;
951 /// # // some output column are ignored
952 /// let expected = vec![
953 /// "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
954 /// "| describe | c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment |",
955 /// "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
956 /// "| count | 9.0 | 9 | 9 | 9.0 | 9 | 9.0 | 9 | 9 |",
957 /// "| max | 10.0 | Customer#000000010 | xKiAFTjUsCuxfeleNqefumTrjS | 20.0 | 30-114-968-4951 | 9561.95 | MACHINERY | tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious |",
958 /// "| mean | 6.0 | null | null | 9.88888888888889 | null | 5153.2155555555555 | null | null |",
959 /// "| median | 6.0 | null | null | 8.0 | null | 6819.74 | null | null |",
960 /// "| min | 2.0 | Customer#000000002 | 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 | 1.0 | 11-719-748-3364 | 121.65 | AUTOMOBILE | deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov |",
961 /// "| null_count | 0.0 | 0 | 0 | 0.0 | 0 | 0.0 | 0 | 0 |",
962 /// "| std | 2.7386127875258306 | null | null | 7.2188026092359046 | null | 3522.169804254585 | null | null |",
963 /// "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+"];
964 /// assert_batches_sorted_eq!(expected, &stat.collect().await?);
965 /// # Ok(())
966 /// # }
967 /// ```
968 pub async fn describe(self) -> Result<Self> {
969 //the functions now supported
970 let supported_describe_functions =
971 vec!["count", "null_count", "mean", "std", "min", "max", "median"];
972
973 let original_schema_fields = self.schema().fields().iter();
974
975 //define describe column
976 let mut describe_schemas = vec![Field::new("describe", DataType::Utf8, false)];
977 describe_schemas.extend(original_schema_fields.clone().map(|field| {
978 if field.data_type().is_numeric() {
979 Field::new(field.name(), DataType::Float64, true)
980 } else {
981 Field::new(field.name(), DataType::Utf8, true)
982 }
983 }));
984
985 //collect recordBatch
986 let describe_record_batch = [
987 // count aggregation
988 self.clone().aggregate(
989 vec![],
990 original_schema_fields
991 .clone()
992 .map(|f| count(ident(f.name())).alias(f.name()))
993 .collect::<Vec<_>>(),
994 ),
995 // null_count aggregation
996 self.clone().aggregate(
997 vec![],
998 original_schema_fields
999 .clone()
1000 .map(|f| {
1001 sum(case(is_null(ident(f.name())))
1002 .when(lit(true), lit(1))
1003 .otherwise(lit(0))
1004 .unwrap())
1005 .alias(f.name())
1006 })
1007 .collect::<Vec<_>>(),
1008 ),
1009 // mean aggregation
1010 self.clone().aggregate(
1011 vec![],
1012 original_schema_fields
1013 .clone()
1014 .filter(|f| f.data_type().is_numeric())
1015 .map(|f| avg(ident(f.name())).alias(f.name()))
1016 .collect::<Vec<_>>(),
1017 ),
1018 // std aggregation
1019 self.clone().aggregate(
1020 vec![],
1021 original_schema_fields
1022 .clone()
1023 .filter(|f| f.data_type().is_numeric())
1024 .map(|f| stddev(ident(f.name())).alias(f.name()))
1025 .collect::<Vec<_>>(),
1026 ),
1027 // min aggregation
1028 self.clone().aggregate(
1029 vec![],
1030 original_schema_fields
1031 .clone()
1032 .filter(|f| {
1033 !matches!(f.data_type(), DataType::Binary | DataType::Boolean)
1034 })
1035 .map(|f| min(ident(f.name())).alias(f.name()))
1036 .collect::<Vec<_>>(),
1037 ),
1038 // max aggregation
1039 self.clone().aggregate(
1040 vec![],
1041 original_schema_fields
1042 .clone()
1043 .filter(|f| {
1044 !matches!(f.data_type(), DataType::Binary | DataType::Boolean)
1045 })
1046 .map(|f| max(ident(f.name())).alias(f.name()))
1047 .collect::<Vec<_>>(),
1048 ),
1049 // median aggregation
1050 self.clone().aggregate(
1051 vec![],
1052 original_schema_fields
1053 .clone()
1054 .filter(|f| f.data_type().is_numeric())
1055 .map(|f| median(ident(f.name())).alias(f.name()))
1056 .collect::<Vec<_>>(),
1057 ),
1058 ];
1059
1060 // first column with function names
1061 let mut array_ref_vec: Vec<ArrayRef> = vec![Arc::new(StringArray::from(
1062 supported_describe_functions.clone(),
1063 ))];
1064 for field in original_schema_fields {
1065 let mut array_datas = vec![];
1066 for result in describe_record_batch.iter() {
1067 let array_ref = match result {
1068 Ok(df) => {
1069 let batches = df.clone().collect().await;
1070 match batches {
1071 Ok(batches)
1072 if batches.len() == 1
1073 && batches[0]
1074 .column_by_name(field.name())
1075 .is_some() =>
1076 {
1077 let column =
1078 batches[0].column_by_name(field.name()).unwrap();
1079
1080 if column.data_type().is_null() {
1081 Arc::new(StringArray::from(vec!["null"]))
1082 } else if field.data_type().is_numeric() {
1083 cast(column, &DataType::Float64)?
1084 } else {
1085 cast(column, &DataType::Utf8)?
1086 }
1087 }
1088 _ => Arc::new(StringArray::from(vec!["null"])),
1089 }
1090 }
1091 //Handling error when only boolean/binary column, and in other cases
1092 Err(err)
1093 if err.to_string().contains(
1094 "Error during planning: \
1095 Aggregate requires at least one grouping \
1096 or aggregate expression",
1097 ) =>
1098 {
1099 Arc::new(StringArray::from(vec!["null"]))
1100 }
1101 Err(e) => return exec_err!("{}", e),
1102 };
1103 array_datas.push(array_ref);
1104 }
1105 array_ref_vec.push(concat(
1106 array_datas
1107 .iter()
1108 .map(|af| af.as_ref())
1109 .collect::<Vec<_>>()
1110 .as_slice(),
1111 )?);
1112 }
1113
1114 let describe_record_batch =
1115 RecordBatch::try_new(Arc::new(Schema::new(describe_schemas)), array_ref_vec)?;
1116
1117 let provider = MemTable::try_new(
1118 describe_record_batch.schema(),
1119 vec![vec![describe_record_batch]],
1120 )?;
1121
1122 let plan = LogicalPlanBuilder::scan(
1123 UNNAMED_TABLE,
1124 provider_as_source(Arc::new(provider)),
1125 None,
1126 )?
1127 .build()?;
1128
1129 Ok(DataFrame {
1130 session_state: self.session_state,
1131 plan,
1132 projection_requires_validation: self.projection_requires_validation,
1133 })
1134 }
1135
1136 /// Apply a sort by provided expressions with default direction
1137 pub fn sort_by(self, expr: Vec<Expr>) -> Result<DataFrame> {
1138 self.sort(
1139 expr.into_iter()
1140 .map(|e| e.sort(true, false))
1141 .collect::<Vec<SortExpr>>(),
1142 )
1143 }
1144
1145 /// Sort the DataFrame by the specified sorting expressions.
1146 ///
1147 /// Note that any expression can be turned into
1148 /// a sort expression by calling its [sort](Expr::sort) method.
1149 ///
1150 /// # Example
1151 ///
1152 /// ```
1153 /// # use datafusion::prelude::*;
1154 /// # use datafusion::error::Result;
1155 /// # use datafusion_common::assert_batches_sorted_eq;
1156 /// # #[tokio::main]
1157 /// # async fn main() -> Result<()> {
1158 /// let ctx = SessionContext::new();
1159 /// let df = ctx
1160 /// .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
1161 /// .await?;
1162 /// let df = df.sort(vec![
1163 /// col("a").sort(false, true), // a DESC, nulls first
1164 /// col("b").sort(true, false), // b ASC, nulls last
1165 /// ])?;
1166 /// let expected = vec![
1167 /// "+---+---+---+",
1168 /// "| a | b | c |",
1169 /// "+---+---+---+",
1170 /// "| 1 | 2 | 3 |",
1171 /// "| 4 | 5 | 6 |",
1172 /// "| 7 | 8 | 9 |",
1173 /// "+---+---+---+",
1174 /// ];
1175 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1176 /// # Ok(())
1177 /// # }
1178 /// ```
1179 pub fn sort(self, expr: Vec<SortExpr>) -> Result<DataFrame> {
1180 let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
1181 Ok(DataFrame {
1182 session_state: self.session_state,
1183 plan,
1184 projection_requires_validation: self.projection_requires_validation,
1185 })
1186 }
1187
1188 /// Join this `DataFrame` with another `DataFrame` using explicitly specified
1189 /// columns and an optional filter expression.
1190 ///
1191 /// See [`join_on`](Self::join_on) for a more concise way to specify the
1192 /// join condition. Since DataFusion will automatically identify and
1193 /// optimize equality predicates there is no performance difference between
1194 /// this function and `join_on`
1195 ///
1196 /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
1197 /// example below), which are then combined with the optional `filter`
1198 /// expression. If `left_cols` and `right_cols` contain ambiguous column
1199 /// references, they will be disambiguated by prioritizing the left relation
1200 /// for `left_cols` and the right relation for `right_cols`.
1201 ///
1202 /// Note that in case of outer join, the `filter` is applied to only matched rows.
1203 ///
1204 /// # Example
1205 /// ```
1206 /// # use datafusion::prelude::*;
1207 /// # use datafusion::error::Result;
1208 /// # use datafusion_common::assert_batches_sorted_eq;
1209 /// # #[tokio::main]
1210 /// # async fn main() -> Result<()> {
1211 /// let ctx = SessionContext::new();
1212 /// let left = ctx
1213 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1214 /// .await?;
1215 /// let right = ctx
1216 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1217 /// .await?
1218 /// .select(vec![
1219 /// col("a").alias("a2"),
1220 /// col("b").alias("b2"),
1221 /// col("c").alias("c2"),
1222 /// ])?;
1223 /// // Perform the equivalent of `left INNER JOIN right ON (a = a2 AND b = b2)`
1224 /// // finding all pairs of rows from `left` and `right` where `a = a2` and `b = b2`.
1225 /// let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"], None)?;
1226 /// let expected = vec![
1227 /// "+---+---+---+----+----+----+",
1228 /// "| a | b | c | a2 | b2 | c2 |",
1229 /// "+---+---+---+----+----+----+",
1230 /// "| 1 | 2 | 3 | 1 | 2 | 3 |",
1231 /// "+---+---+---+----+----+----+",
1232 /// ];
1233 /// assert_batches_sorted_eq!(expected, &join.collect().await?);
1234 /// # Ok(())
1235 /// # }
1236 /// ```
1237 pub fn join(
1238 self,
1239 right: DataFrame,
1240 join_type: JoinType,
1241 left_cols: &[&str],
1242 right_cols: &[&str],
1243 filter: Option<Expr>,
1244 ) -> Result<DataFrame> {
1245 let plan = LogicalPlanBuilder::from(self.plan)
1246 .join(
1247 right.plan,
1248 join_type,
1249 (left_cols.to_vec(), right_cols.to_vec()),
1250 filter,
1251 )?
1252 .build()?;
1253 Ok(DataFrame {
1254 session_state: self.session_state,
1255 plan,
1256 projection_requires_validation: true,
1257 })
1258 }
1259
1260 /// Join this `DataFrame` with another `DataFrame` using the specified
1261 /// expressions.
1262 ///
1263 /// Note that DataFusion automatically optimizes joins, including
1264 /// identifying and optimizing equality predicates.
1265 ///
1266 /// # Example
1267 /// ```
1268 /// # use datafusion::prelude::*;
1269 /// # use datafusion::error::Result;
1270 /// # use datafusion_common::assert_batches_sorted_eq;
1271 /// # #[tokio::main]
1272 /// # async fn main() -> Result<()> {
1273 /// let ctx = SessionContext::new();
1274 /// let left = ctx
1275 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1276 /// .await?;
1277 /// let right = ctx
1278 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1279 /// .await?
1280 /// .select(vec![
1281 /// col("a").alias("a2"),
1282 /// col("b").alias("b2"),
1283 /// col("c").alias("c2"),
1284 /// ])?;
1285 ///
1286 /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
1287 /// // finding all pairs of rows from `left` and `right` where
1288 /// // where `a != a2` and `b != b2`.
1289 /// let join_on = left.join_on(
1290 /// right,
1291 /// JoinType::Inner,
1292 /// [col("a").not_eq(col("a2")), col("b").not_eq(col("b2"))],
1293 /// )?;
1294 /// let expected = vec![
1295 /// "+---+---+---+----+----+----+",
1296 /// "| a | b | c | a2 | b2 | c2 |",
1297 /// "+---+---+---+----+----+----+",
1298 /// "+---+---+---+----+----+----+",
1299 /// ];
1300 /// # assert_batches_sorted_eq!(expected, &join_on.collect().await?);
1301 /// # Ok(())
1302 /// # }
1303 /// ```
1304 pub fn join_on(
1305 self,
1306 right: DataFrame,
1307 join_type: JoinType,
1308 on_exprs: impl IntoIterator<Item = Expr>,
1309 ) -> Result<DataFrame> {
1310 let plan = LogicalPlanBuilder::from(self.plan)
1311 .join_on(right.plan, join_type, on_exprs)?
1312 .build()?;
1313 Ok(DataFrame {
1314 session_state: self.session_state,
1315 plan,
1316 projection_requires_validation: true,
1317 })
1318 }
1319
1320 /// Repartition a DataFrame based on a logical partitioning scheme.
1321 ///
1322 /// # Example
1323 /// ```
1324 /// # use datafusion::prelude::*;
1325 /// # use datafusion::error::Result;
1326 /// # use datafusion_common::assert_batches_sorted_eq;
1327 /// # #[tokio::main]
1328 /// # async fn main() -> Result<()> {
1329 /// let ctx = SessionContext::new();
1330 /// let df = ctx
1331 /// .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
1332 /// .await?;
1333 /// let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
1334 /// let expected = vec![
1335 /// "+---+---+---+",
1336 /// "| a | b | c |",
1337 /// "+---+---+---+",
1338 /// "| 1 | 2 | 3 |",
1339 /// "| 4 | 5 | 6 |",
1340 /// "| 7 | 8 | 9 |",
1341 /// "+---+---+---+",
1342 /// ];
1343 /// # assert_batches_sorted_eq!(expected, &df1.collect().await?);
1344 /// # Ok(())
1345 /// # }
1346 /// ```
1347 pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame> {
1348 let plan = LogicalPlanBuilder::from(self.plan)
1349 .repartition(partitioning_scheme)?
1350 .build()?;
1351 Ok(DataFrame {
1352 session_state: self.session_state,
1353 plan,
1354 projection_requires_validation: true,
1355 })
1356 }
1357
1358 /// Return the total number of rows in this `DataFrame`.
1359 ///
1360 /// Note that this method will actually run a plan to calculate the count,
1361 /// which may be slow for large or complicated DataFrames.
1362 ///
1363 /// # Example
1364 /// ```
1365 /// # use datafusion::prelude::*;
1366 /// # use datafusion::error::Result;
1367 /// # #[tokio::main]
1368 /// # async fn main() -> Result<()> {
1369 /// let ctx = SessionContext::new();
1370 /// let df = ctx
1371 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1372 /// .await?;
1373 /// let count = df.count().await?; // 1
1374 /// # assert_eq!(count, 1);
1375 /// # Ok(())
1376 /// # }
1377 /// ```
1378 pub async fn count(self) -> Result<usize> {
1379 let rows = self
1380 .aggregate(
1381 vec![],
1382 vec![count(Expr::Literal(COUNT_STAR_EXPANSION, None))],
1383 )?
1384 .collect()
1385 .await?;
1386 let len = *rows
1387 .first()
1388 .and_then(|r| r.columns().first())
1389 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1390 .and_then(|a| a.values().first())
1391 .ok_or_else(|| {
1392 internal_datafusion_err!("Unexpected output when collecting for count()")
1393 })? as usize;
1394 Ok(len)
1395 }
1396
1397 /// Execute this `DataFrame` and buffer all resulting `RecordBatch`es into memory.
1398 ///
1399 /// Prior to calling `collect`, modifying a DataFrame simply updates a plan
1400 /// (no actual computation is performed). `collect` triggers the computation.
1401 ///
1402 /// See [`Self::execute_stream`] to execute a DataFrame without buffering.
1403 ///
1404 /// # Example
1405 /// ```
1406 /// # use datafusion::prelude::*;
1407 /// # use datafusion::error::Result;
1408 /// # #[tokio::main]
1409 /// # async fn main() -> Result<()> {
1410 /// let ctx = SessionContext::new();
1411 /// let df = ctx
1412 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1413 /// .await?;
1414 /// let batches = df.collect().await?;
1415 /// # Ok(())
1416 /// # }
1417 /// ```
1418 pub async fn collect(self) -> Result<Vec<RecordBatch>> {
1419 let task_ctx = Arc::new(self.task_ctx());
1420 let plan = self.create_physical_plan().await?;
1421 collect(plan, task_ctx).await
1422 }
1423
1424 /// Execute the `DataFrame` and print the results to the console.
1425 ///
1426 /// # Example
1427 /// ```
1428 /// # use datafusion::prelude::*;
1429 /// # use datafusion::error::Result;
1430 /// # #[tokio::main]
1431 /// # async fn main() -> Result<()> {
1432 /// let ctx = SessionContext::new();
1433 /// let df = ctx
1434 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1435 /// .await?;
1436 /// df.show().await?;
1437 /// # Ok(())
1438 /// # }
1439 /// ```
1440 pub async fn show(self) -> Result<()> {
1441 println!("{}", self.to_string().await?);
1442 Ok(())
1443 }
1444
1445 /// Execute the `DataFrame` and return a string representation of the results.
1446 ///
1447 /// # Example
1448 /// ```
1449 /// # use datafusion::prelude::*;
1450 /// # use datafusion::error::Result;
1451 /// # use datafusion::execution::SessionStateBuilder;
1452 ///
1453 /// # #[tokio::main]
1454 /// # async fn main() -> Result<()> {
1455 /// let cfg = SessionConfig::new()
1456 /// .set_str("datafusion.format.null", "no-value");
1457 /// let session_state = SessionStateBuilder::new()
1458 /// .with_config(cfg)
1459 /// .with_default_features()
1460 /// .build();
1461 /// let ctx = SessionContext::new_with_state(session_state);
1462 /// let df = ctx.sql("select null as 'null-column'").await?;
1463 /// let result = df.to_string().await?;
1464 /// assert_eq!(result,
1465 /// "+-------------+
1466 /// | null-column |
1467 /// +-------------+
1468 /// | no-value |
1469 /// +-------------+"
1470 /// );
1471 /// # Ok(())
1472 /// # }
1473 pub async fn to_string(self) -> Result<String> {
1474 let options = self.session_state.config().options().format.clone();
1475 let arrow_options: arrow::util::display::FormatOptions = (&options).try_into()?;
1476
1477 let results = self.collect().await?;
1478 Ok(
1479 pretty::pretty_format_batches_with_options(&results, &arrow_options)?
1480 .to_string(),
1481 )
1482 }
1483
1484 /// Execute the `DataFrame` and print only the first `num` rows of the
1485 /// result to the console.
1486 ///
1487 /// # Example
1488 /// ```
1489 /// # use datafusion::prelude::*;
1490 /// # use datafusion::error::Result;
1491 /// # #[tokio::main]
1492 /// # async fn main() -> Result<()> {
1493 /// let ctx = SessionContext::new();
1494 /// let df = ctx
1495 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1496 /// .await?;
1497 /// df.show_limit(10).await?;
1498 /// # Ok(())
1499 /// # }
1500 /// ```
1501 pub async fn show_limit(self, num: usize) -> Result<()> {
1502 let results = self.limit(0, Some(num))?.collect().await?;
1503 Ok(pretty::print_batches(&results)?)
1504 }
1505
1506 /// Return a new [`TaskContext`] which would be used to execute this DataFrame
1507 pub fn task_ctx(&self) -> TaskContext {
1508 TaskContext::from(self.session_state.as_ref())
1509 }
1510
1511 /// Executes this DataFrame and returns a stream over a single partition
1512 ///
1513 /// See [Self::collect] to buffer the `RecordBatch`es in memory.
1514 ///
1515 /// # Example
1516 /// ```
1517 /// # use datafusion::prelude::*;
1518 /// # use datafusion::error::Result;
1519 /// # #[tokio::main]
1520 /// # async fn main() -> Result<()> {
1521 /// let ctx = SessionContext::new();
1522 /// let df = ctx
1523 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1524 /// .await?;
1525 /// let stream = df.execute_stream().await?;
1526 /// # Ok(())
1527 /// # }
1528 /// ```
1529 ///
1530 /// # Aborting Execution
1531 ///
1532 /// Dropping the stream will abort the execution of the query, and free up
1533 /// any allocated resources
1534 pub async fn execute_stream(self) -> Result<SendableRecordBatchStream> {
1535 let task_ctx = Arc::new(self.task_ctx());
1536 let plan = self.create_physical_plan().await?;
1537 execute_stream(plan, task_ctx)
1538 }
1539
1540 /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
1541 /// maintaining the input partitioning.
1542 ///
1543 /// # Example
1544 /// ```
1545 /// # use datafusion::prelude::*;
1546 /// # use datafusion::error::Result;
1547 /// # #[tokio::main]
1548 /// # async fn main() -> Result<()> {
1549 /// let ctx = SessionContext::new();
1550 /// let df = ctx
1551 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1552 /// .await?;
1553 /// let batches = df.collect_partitioned().await?;
1554 /// # Ok(())
1555 /// # }
1556 /// ```
1557 pub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>> {
1558 let task_ctx = Arc::new(self.task_ctx());
1559 let plan = self.create_physical_plan().await?;
1560 collect_partitioned(plan, task_ctx).await
1561 }
1562
1563 /// Executes this DataFrame and returns one stream per partition.
1564 ///
1565 /// # Example
1566 /// ```
1567 /// # use datafusion::prelude::*;
1568 /// # use datafusion::error::Result;
1569 /// # #[tokio::main]
1570 /// # async fn main() -> Result<()> {
1571 /// let ctx = SessionContext::new();
1572 /// let df = ctx
1573 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1574 /// .await?;
1575 /// let batches = df.execute_stream_partitioned().await?;
1576 /// # Ok(())
1577 /// # }
1578 /// ```
1579 /// # Aborting Execution
1580 ///
1581 /// Dropping the stream will abort the execution of the query, and free up
1582 /// any allocated resources
1583 pub async fn execute_stream_partitioned(
1584 self,
1585 ) -> Result<Vec<SendableRecordBatchStream>> {
1586 let task_ctx = Arc::new(self.task_ctx());
1587 let plan = self.create_physical_plan().await?;
1588 execute_stream_partitioned(plan, task_ctx)
1589 }
1590
1591 /// Returns the `DFSchema` describing the output of this DataFrame.
1592 ///
1593 /// The output `DFSchema` contains information on the name, data type, and
1594 /// nullability for each column.
1595 ///
1596 /// # Example
1597 /// ```
1598 /// # use datafusion::prelude::*;
1599 /// # use datafusion::error::Result;
1600 /// # #[tokio::main]
1601 /// # async fn main() -> Result<()> {
1602 /// let ctx = SessionContext::new();
1603 /// let df = ctx
1604 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1605 /// .await?;
1606 /// let schema = df.schema();
1607 /// # Ok(())
1608 /// # }
1609 /// ```
1610 pub fn schema(&self) -> &DFSchema {
1611 self.plan.schema()
1612 }
1613
1614 /// Return a reference to the unoptimized [`LogicalPlan`] that comprises
1615 /// this DataFrame.
1616 ///
1617 /// See [`Self::into_unoptimized_plan`] for more details.
1618 pub fn logical_plan(&self) -> &LogicalPlan {
1619 &self.plan
1620 }
1621
1622 /// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`]
1623 pub fn into_parts(self) -> (SessionState, LogicalPlan) {
1624 (*self.session_state, self.plan)
1625 }
1626
1627 /// Return the [`LogicalPlan`] represented by this DataFrame without running
1628 /// any optimizers
1629 ///
1630 /// Note: This method should not be used outside testing, as it loses the
1631 /// snapshot of the [`SessionState`] attached to this [`DataFrame`] and
1632 /// consequently subsequent operations may take place against a different
1633 /// state (e.g. a different value of `now()`)
1634 ///
1635 /// See [`Self::into_parts`] to retrieve the owned [`LogicalPlan`] and
1636 /// corresponding [`SessionState`].
1637 pub fn into_unoptimized_plan(self) -> LogicalPlan {
1638 self.plan
1639 }
1640
1641 /// Return the optimized [`LogicalPlan`] represented by this DataFrame.
1642 ///
1643 /// Note: This method should not be used outside testing -- see
1644 /// [`Self::into_unoptimized_plan`] for more details.
1645 pub fn into_optimized_plan(self) -> Result<LogicalPlan> {
1646 // Optimize the plan first for better UX
1647 self.session_state.optimize(&self.plan)
1648 }
1649
1650 /// Converts this [`DataFrame`] into a [`TableProvider`] that can be registered
1651 /// as a table view using [`SessionContext::register_table`].
1652 ///
1653 /// Note: This discards the [`SessionState`] associated with this
1654 /// [`DataFrame`] in favour of the one passed to [`TableProvider::scan`]
1655 pub fn into_view(self) -> Arc<dyn TableProvider> {
1656 Arc::new(DataFrameTableProvider {
1657 plan: self.plan,
1658 table_type: TableType::Temporary,
1659 })
1660 }
1661
1662 /// See [`Self::into_view`]. The returned [`TableProvider`] will
1663 /// create a transient table.
1664 pub fn into_temporary_view(self) -> Arc<dyn TableProvider> {
1665 Arc::new(DataFrameTableProvider {
1666 plan: self.plan,
1667 table_type: TableType::Temporary,
1668 })
1669 }
1670
1671 /// Return a DataFrame with the explanation of its plan so far.
1672 ///
1673 /// if `analyze` is specified, runs the plan and reports metrics
1674 /// if `verbose` is true, prints out additional details.
1675 /// The default format is Indent format.
1676 ///
1677 /// ```
1678 /// # use datafusion::prelude::*;
1679 /// # use datafusion::error::Result;
1680 /// # #[tokio::main]
1681 /// # async fn main() -> Result<()> {
1682 /// let ctx = SessionContext::new();
1683 /// let df = ctx
1684 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1685 /// .await?;
1686 /// let batches = df
1687 /// .limit(0, Some(100))?
1688 /// .explain(false, false)?
1689 /// .collect()
1690 /// .await?;
1691 /// # Ok(())
1692 /// # }
1693 /// ```
1694 pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame> {
1695 // Set the default format to Indent to keep the previous behavior
1696 let opts = ExplainOption::default()
1697 .with_verbose(verbose)
1698 .with_analyze(analyze);
1699 self.explain_with_options(opts)
1700 }
1701
1702 /// Return a DataFrame with the explanation of its plan so far.
1703 ///
1704 /// `opt` is used to specify the options for the explain operation.
1705 /// Details of the options can be found in [`ExplainOption`].
1706 /// ```
1707 /// # use datafusion::prelude::*;
1708 /// # use datafusion::error::Result;
1709 /// # #[tokio::main]
1710 /// # async fn main() -> Result<()> {
1711 /// use datafusion_expr::{Explain, ExplainOption};
1712 /// let ctx = SessionContext::new();
1713 /// let df = ctx
1714 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1715 /// .await?;
1716 /// let batches = df
1717 /// .limit(0, Some(100))?
1718 /// .explain_with_options(
1719 /// ExplainOption::default()
1720 /// .with_verbose(false)
1721 /// .with_analyze(false),
1722 /// )?
1723 /// .collect()
1724 /// .await?;
1725 /// # Ok(())
1726 /// # }
1727 /// ```
1728 pub fn explain_with_options(
1729 self,
1730 explain_option: ExplainOption,
1731 ) -> Result<DataFrame> {
1732 if matches!(self.plan, LogicalPlan::Explain(_)) {
1733 return plan_err!("Nested EXPLAINs are not supported");
1734 }
1735 let plan = LogicalPlanBuilder::from(self.plan)
1736 .explain_option_format(explain_option)?
1737 .build()?;
1738 Ok(DataFrame {
1739 session_state: self.session_state,
1740 plan,
1741 projection_requires_validation: self.projection_requires_validation,
1742 })
1743 }
1744
1745 /// Return a `FunctionRegistry` used to plan udf's calls
1746 ///
1747 /// # Example
1748 /// ```
1749 /// # use datafusion::prelude::*;
1750 /// # use datafusion::error::Result;
1751 /// # #[tokio::main]
1752 /// # async fn main() -> Result<()> {
1753 /// let ctx = SessionContext::new();
1754 /// let df = ctx
1755 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1756 /// .await?;
1757 /// let f = df.registry();
1758 /// // use f.udf("name", vec![...]) to use the udf
1759 /// # Ok(())
1760 /// # }
1761 /// ```
1762 pub fn registry(&self) -> &dyn FunctionRegistry {
1763 self.session_state.as_ref()
1764 }
1765
1766 /// Calculate the intersection of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
1767 ///
1768 /// ```
1769 /// # use datafusion::prelude::*;
1770 /// # use datafusion::error::Result;
1771 /// # use datafusion_common::assert_batches_sorted_eq;
1772 /// # #[tokio::main]
1773 /// # async fn main() -> Result<()> {
1774 /// let ctx = SessionContext::new();
1775 /// let df = ctx
1776 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1777 /// .await?;
1778 /// let d2 = ctx
1779 /// .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
1780 /// .await?;
1781 /// let df = df.intersect(d2)?;
1782 /// let expected = vec![
1783 /// "+---+---+---+",
1784 /// "| a | b | c |",
1785 /// "+---+---+---+",
1786 /// "| 1 | 2 | 3 |",
1787 /// "+---+---+---+",
1788 /// ];
1789 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1790 /// # Ok(())
1791 /// # }
1792 /// ```
1793 pub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame> {
1794 let left_plan = self.plan;
1795 let right_plan = dataframe.plan;
1796 let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, true)?;
1797 Ok(DataFrame {
1798 session_state: self.session_state,
1799 plan,
1800 projection_requires_validation: true,
1801 })
1802 }
1803
1804 /// Calculate the distinct intersection of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
1805 ///
1806 /// ```
1807 /// # use datafusion::prelude::*;
1808 /// # use datafusion::error::Result;
1809 /// # use datafusion_common::assert_batches_sorted_eq;
1810 /// # #[tokio::main]
1811 /// # async fn main() -> Result<()> {
1812 /// let ctx = SessionContext::new();
1813 /// let df = ctx
1814 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1815 /// .await?;
1816 /// let d2 = ctx
1817 /// .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
1818 /// .await?;
1819 /// let df = df.intersect_distinct(d2)?;
1820 /// let expected = vec![
1821 /// "+---+---+---+",
1822 /// "| a | b | c |",
1823 /// "+---+---+---+",
1824 /// "| 1 | 2 | 3 |",
1825 /// "+---+---+---+",
1826 /// ];
1827 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1828 /// # Ok(())
1829 /// # }
1830 /// ```
1831 pub fn intersect_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
1832 let left_plan = self.plan;
1833 let right_plan = dataframe.plan;
1834 let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, false)?;
1835 Ok(DataFrame {
1836 session_state: self.session_state,
1837 plan,
1838 projection_requires_validation: true,
1839 })
1840 }
1841
1842 /// Calculate the exception of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
1843 ///
1844 /// ```
1845 /// # use datafusion::prelude::*;
1846 /// # use datafusion::error::Result;
1847 /// # use datafusion_common::assert_batches_sorted_eq;
1848 /// # #[tokio::main]
1849 /// # async fn main() -> Result<()> {
1850 /// let ctx = SessionContext::new();
1851 /// let df = ctx
1852 /// .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
1853 /// .await?;
1854 /// let d2 = ctx
1855 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1856 /// .await?;
1857 /// let result = df.except(d2)?;
1858 /// // those columns are not in example.csv, but in example_long.csv
1859 /// let expected = vec![
1860 /// "+---+---+---+",
1861 /// "| a | b | c |",
1862 /// "+---+---+---+",
1863 /// "| 4 | 5 | 6 |",
1864 /// "| 7 | 8 | 9 |",
1865 /// "+---+---+---+",
1866 /// ];
1867 /// # assert_batches_sorted_eq!(expected, &result.collect().await?);
1868 /// # Ok(())
1869 /// # }
1870 /// ```
1871 pub fn except(self, dataframe: DataFrame) -> Result<DataFrame> {
1872 let left_plan = self.plan;
1873 let right_plan = dataframe.plan;
1874 let plan = LogicalPlanBuilder::except(left_plan, right_plan, true)?;
1875 Ok(DataFrame {
1876 session_state: self.session_state,
1877 plan,
1878 projection_requires_validation: true,
1879 })
1880 }
1881
1882 /// Calculate the distinct exception of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
1883 ///
1884 /// ```
1885 /// # use datafusion::prelude::*;
1886 /// # use datafusion::error::Result;
1887 /// # use datafusion_common::assert_batches_sorted_eq;
1888 /// # #[tokio::main]
1889 /// # async fn main() -> Result<()> {
1890 /// let ctx = SessionContext::new();
1891 /// let df = ctx
1892 /// .read_csv("tests/data/example_long.csv", CsvReadOptions::new())
1893 /// .await?;
1894 /// let d2 = ctx
1895 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1896 /// .await?;
1897 /// let result = df.except_distinct(d2)?;
1898 /// // those columns are not in example.csv, but in example_long.csv
1899 /// let expected = vec![
1900 /// "+---+---+---+",
1901 /// "| a | b | c |",
1902 /// "+---+---+---+",
1903 /// "| 4 | 5 | 6 |",
1904 /// "| 7 | 8 | 9 |",
1905 /// "+---+---+---+",
1906 /// ];
1907 /// # assert_batches_sorted_eq!(expected, &result.collect().await?);
1908 /// # Ok(())
1909 /// # }
1910 /// ```
1911 pub fn except_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
1912 let left_plan = self.plan;
1913 let right_plan = dataframe.plan;
1914 let plan = LogicalPlanBuilder::except(left_plan, right_plan, false)?;
1915 Ok(DataFrame {
1916 session_state: self.session_state,
1917 plan,
1918 projection_requires_validation: true,
1919 })
1920 }
1921
1922 /// Execute this `DataFrame` and write the results to `table_name`.
1923 ///
1924 /// Returns a single [RecordBatch] containing a single column and
1925 /// row representing the count of total rows written.
1926 ///
1927 /// Unlike most other `DataFrame` methods, this method executes eagerly.
1928 /// Data is written to the table using the [`TableProvider::insert_into`]
1929 /// method. This is the same underlying implementation used by SQL `INSERT
1930 /// INTO` statements.
1931 pub async fn write_table(
1932 self,
1933 table_name: &str,
1934 write_options: DataFrameWriteOptions,
1935 ) -> Result<Vec<RecordBatch>, DataFusionError> {
1936 let plan = if write_options.sort_by.is_empty() {
1937 self.plan
1938 } else {
1939 LogicalPlanBuilder::from(self.plan)
1940 .sort(write_options.sort_by)?
1941 .build()?
1942 };
1943
1944 let table_ref: TableReference = table_name.into();
1945 let table_schema = self.session_state.schema_for_ref(table_ref.clone())?;
1946 let target = match table_schema.table(table_ref.table()).await? {
1947 Some(ref provider) => Ok(Arc::clone(provider)),
1948 _ => plan_err!("No table named '{table_name}'"),
1949 }?;
1950
1951 let target = Arc::new(DefaultTableSource::new(target));
1952
1953 let plan = LogicalPlanBuilder::insert_into(
1954 plan,
1955 table_ref,
1956 target,
1957 write_options.insert_op,
1958 )?
1959 .build()?;
1960
1961 DataFrame {
1962 session_state: self.session_state,
1963 plan,
1964 projection_requires_validation: self.projection_requires_validation,
1965 }
1966 .collect()
1967 .await
1968 }
1969
1970 /// Execute the `DataFrame` and write the results to CSV file(s).
1971 ///
1972 /// # Example
1973 /// ```
1974 /// # use datafusion::prelude::*;
1975 /// # use datafusion::error::Result;
1976 /// # use std::fs;
1977 /// # #[tokio::main]
1978 /// # async fn main() -> Result<()> {
1979 /// use datafusion::dataframe::DataFrameWriteOptions;
1980 /// let ctx = SessionContext::new();
1981 /// // Sort the data by column "b" and write it to a new location
1982 /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new())
1983 /// .await?
1984 /// .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
1985 /// .write_csv(
1986 /// "output.csv",
1987 /// DataFrameWriteOptions::new(),
1988 /// None, // can also specify CSV writing options here
1989 /// )
1990 /// .await?;
1991 /// # fs::remove_file("output.csv")?;
1992 /// # Ok(())
1993 /// # }
1994 /// ```
1995 pub async fn write_csv(
1996 self,
1997 path: &str,
1998 options: DataFrameWriteOptions,
1999 writer_options: Option<CsvOptions>,
2000 ) -> Result<Vec<RecordBatch>, DataFusionError> {
2001 if options.insert_op != InsertOp::Append {
2002 return not_impl_err!(
2003 "{} is not implemented for DataFrame::write_csv.",
2004 options.insert_op
2005 );
2006 }
2007
2008 let format = if let Some(csv_opts) = writer_options {
2009 Arc::new(CsvFormatFactory::new_with_options(csv_opts))
2010 } else {
2011 Arc::new(CsvFormatFactory::new())
2012 };
2013
2014 let file_type = format_as_file_type(format);
2015
2016 let plan = if options.sort_by.is_empty() {
2017 self.plan
2018 } else {
2019 LogicalPlanBuilder::from(self.plan)
2020 .sort(options.sort_by)?
2021 .build()?
2022 };
2023
2024 let plan = LogicalPlanBuilder::copy_to(
2025 plan,
2026 path.into(),
2027 file_type,
2028 HashMap::new(),
2029 options.partition_by,
2030 )?
2031 .build()?;
2032
2033 DataFrame {
2034 session_state: self.session_state,
2035 plan,
2036 projection_requires_validation: self.projection_requires_validation,
2037 }
2038 .collect()
2039 .await
2040 }
2041
2042 /// Execute the `DataFrame` and write the results to JSON file(s).
2043 ///
2044 /// # Example
2045 /// ```
2046 /// # use datafusion::prelude::*;
2047 /// # use datafusion::error::Result;
2048 /// # use std::fs;
2049 /// # #[tokio::main]
2050 /// # async fn main() -> Result<()> {
2051 /// use datafusion::dataframe::DataFrameWriteOptions;
2052 /// let ctx = SessionContext::new();
2053 /// // Sort the data by column "b" and write it to a new location
2054 /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new())
2055 /// .await?
2056 /// .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
2057 /// .write_json("output.json", DataFrameWriteOptions::new(), None)
2058 /// .await?;
2059 /// # fs::remove_file("output.json")?;
2060 /// # Ok(())
2061 /// # }
2062 /// ```
2063 pub async fn write_json(
2064 self,
2065 path: &str,
2066 options: DataFrameWriteOptions,
2067 writer_options: Option<JsonOptions>,
2068 ) -> Result<Vec<RecordBatch>, DataFusionError> {
2069 if options.insert_op != InsertOp::Append {
2070 return not_impl_err!(
2071 "{} is not implemented for DataFrame::write_json.",
2072 options.insert_op
2073 );
2074 }
2075
2076 let format = if let Some(json_opts) = writer_options {
2077 Arc::new(JsonFormatFactory::new_with_options(json_opts))
2078 } else {
2079 Arc::new(JsonFormatFactory::new())
2080 };
2081
2082 let file_type = format_as_file_type(format);
2083
2084 let plan = if options.sort_by.is_empty() {
2085 self.plan
2086 } else {
2087 LogicalPlanBuilder::from(self.plan)
2088 .sort(options.sort_by)?
2089 .build()?
2090 };
2091
2092 let plan = LogicalPlanBuilder::copy_to(
2093 plan,
2094 path.into(),
2095 file_type,
2096 Default::default(),
2097 options.partition_by,
2098 )?
2099 .build()?;
2100
2101 DataFrame {
2102 session_state: self.session_state,
2103 plan,
2104 projection_requires_validation: self.projection_requires_validation,
2105 }
2106 .collect()
2107 .await
2108 }
2109
2110 /// Add or replace a column in the DataFrame.
2111 ///
2112 /// # Example
2113 /// ```
2114 /// # use datafusion::prelude::*;
2115 /// # use datafusion::error::Result;
2116 /// # #[tokio::main]
2117 /// # async fn main() -> Result<()> {
2118 /// let ctx = SessionContext::new();
2119 /// let df = ctx
2120 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
2121 /// .await?;
2122 /// let df = df.with_column("ab_sum", col("a") + col("b"))?;
2123 /// # Ok(())
2124 /// # }
2125 /// ```
2126 pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame> {
2127 let window_func_exprs = find_window_exprs([&expr]);
2128
2129 let original_names: HashSet<String> = self
2130 .plan
2131 .schema()
2132 .iter()
2133 .map(|(_, f)| f.name().clone())
2134 .collect();
2135
2136 // Maybe build window plan
2137 let plan = if window_func_exprs.is_empty() {
2138 self.plan
2139 } else {
2140 LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
2141 };
2142
2143 let new_column = expr.alias(name);
2144 let mut col_exists = false;
2145
2146 let mut fields: Vec<(Expr, bool)> = plan
2147 .schema()
2148 .iter()
2149 .filter_map(|(qualifier, field)| {
2150 // Skip new fields introduced by window_plan
2151 if !original_names.contains(field.name()) {
2152 return None;
2153 }
2154
2155 if field.name() == name {
2156 col_exists = true;
2157 Some((new_column.clone(), true))
2158 } else {
2159 let e = col(Column::from((qualifier, field)));
2160 Some((e, self.projection_requires_validation))
2161 }
2162 })
2163 .collect();
2164
2165 if !col_exists {
2166 fields.push((new_column, true));
2167 }
2168
2169 let project_plan = LogicalPlanBuilder::from(plan)
2170 .project_with_validation(fields)?
2171 .build()?;
2172
2173 Ok(DataFrame {
2174 session_state: self.session_state,
2175 plan: project_plan,
2176 projection_requires_validation: false,
2177 })
2178 }
2179
2180 /// Rename one column by applying a new projection. This is a no-op if the column to be
2181 /// renamed does not exist.
2182 ///
2183 /// The method supports case sensitive rename with wrapping column name into one of following symbols ( " or ' or ` )
2184 ///
2185 /// Alternatively setting DataFusion param `datafusion.sql_parser.enable_ident_normalization` to `false` will enable
2186 /// case sensitive rename without need to wrap column name into special symbols
2187 ///
2188 /// # Example
2189 /// ```
2190 /// # use datafusion::prelude::*;
2191 /// # use datafusion::error::Result;
2192 /// # #[tokio::main]
2193 /// # async fn main() -> Result<()> {
2194 /// let ctx = SessionContext::new();
2195 /// let df = ctx
2196 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
2197 /// .await?;
2198 /// let df = df.with_column_renamed("ab_sum", "total")?;
2199 ///
2200 /// # Ok(())
2201 /// # }
2202 /// ```
2203 pub fn with_column_renamed(
2204 self,
2205 old_name: impl Into<String>,
2206 new_name: &str,
2207 ) -> Result<DataFrame> {
2208 let ident_opts = self
2209 .session_state
2210 .config_options()
2211 .sql_parser
2212 .enable_ident_normalization;
2213 let old_column: Column = if ident_opts {
2214 Column::from_qualified_name(old_name)
2215 } else {
2216 Column::from_qualified_name_ignore_case(old_name)
2217 };
2218
2219 let (qualifier_rename, field_rename) =
2220 match self.plan.schema().qualified_field_from_column(&old_column) {
2221 Ok(qualifier_and_field) => qualifier_and_field,
2222 // no-op if field not found
2223 Err(DataFusionError::SchemaError(e, _))
2224 if matches!(*e, SchemaError::FieldNotFound { .. }) =>
2225 {
2226 return Ok(self);
2227 }
2228 Err(err) => return Err(err),
2229 };
2230 let projection = self
2231 .plan
2232 .schema()
2233 .iter()
2234 .map(|(qualifier, field)| {
2235 if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
2236 (
2237 col(Column::from((qualifier, field)))
2238 .alias_qualified(qualifier.cloned(), new_name),
2239 false,
2240 )
2241 } else {
2242 (col(Column::from((qualifier, field))), false)
2243 }
2244 })
2245 .collect::<Vec<_>>();
2246 let project_plan = LogicalPlanBuilder::from(self.plan)
2247 .project_with_validation(projection)?
2248 .build()?;
2249 Ok(DataFrame {
2250 session_state: self.session_state,
2251 plan: project_plan,
2252 projection_requires_validation: false,
2253 })
2254 }
2255
2256 /// Replace all parameters in logical plan with the specified
2257 /// values, in preparation for execution.
2258 ///
2259 /// # Example
2260 ///
2261 /// ```
2262 /// use datafusion::prelude::*;
2263 /// # use datafusion::{error::Result, assert_batches_eq};
2264 /// # #[tokio::main]
2265 /// # async fn main() -> Result<()> {
2266 /// # use datafusion_common::ScalarValue;
2267 /// let ctx = SessionContext::new();
2268 /// # ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
2269 /// let results = ctx
2270 /// .sql("SELECT a FROM example WHERE b = $1")
2271 /// .await?
2272 /// // replace $1 with value 2
2273 /// .with_param_values(vec![
2274 /// // value at index 0 --> $1
2275 /// ScalarValue::from(2i64)
2276 /// ])?
2277 /// .collect()
2278 /// .await?;
2279 /// assert_batches_eq!(
2280 /// &[
2281 /// "+---+",
2282 /// "| a |",
2283 /// "+---+",
2284 /// "| 1 |",
2285 /// "+---+",
2286 /// ],
2287 /// &results
2288 /// );
2289 /// // Note you can also provide named parameters
2290 /// let results = ctx
2291 /// .sql("SELECT a FROM example WHERE b = $my_param")
2292 /// .await?
2293 /// // replace $my_param with value 2
2294 /// // Note you can also use a HashMap as well
2295 /// .with_param_values(vec![
2296 /// ("my_param", ScalarValue::from(2i64))
2297 /// ])?
2298 /// .collect()
2299 /// .await?;
2300 /// assert_batches_eq!(
2301 /// &[
2302 /// "+---+",
2303 /// "| a |",
2304 /// "+---+",
2305 /// "| 1 |",
2306 /// "+---+",
2307 /// ],
2308 /// &results
2309 /// );
2310 /// # Ok(())
2311 /// # }
2312 /// ```
2313 pub fn with_param_values(self, query_values: impl Into<ParamValues>) -> Result<Self> {
2314 let plan = self.plan.with_param_values(query_values)?;
2315 Ok(DataFrame {
2316 session_state: self.session_state,
2317 plan,
2318 projection_requires_validation: self.projection_requires_validation,
2319 })
2320 }
2321
2322 /// Cache DataFrame as a memory table.
2323 ///
2324 /// ```
2325 /// # use datafusion::prelude::*;
2326 /// # use datafusion::error::Result;
2327 /// # #[tokio::main]
2328 /// # async fn main() -> Result<()> {
2329 /// let ctx = SessionContext::new();
2330 /// let df = ctx
2331 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
2332 /// .await?;
2333 /// let df = df.cache().await?;
2334 /// # Ok(())
2335 /// # }
2336 /// ```
2337 pub async fn cache(self) -> Result<DataFrame> {
2338 let context = SessionContext::new_with_state((*self.session_state).clone());
2339 // The schema is consistent with the output
2340 let plan = self.clone().create_physical_plan().await?;
2341 let schema = plan.schema();
2342 let task_ctx = Arc::new(self.task_ctx());
2343 let partitions = collect_partitioned(plan, task_ctx).await?;
2344 let mem_table = MemTable::try_new(schema, partitions)?;
2345 context.read_table(Arc::new(mem_table))
2346 }
2347
2348 /// Apply an alias to the DataFrame.
2349 ///
2350 /// This method replaces the qualifiers of output columns with the given alias.
2351 pub fn alias(self, alias: &str) -> Result<DataFrame> {
2352 let plan = LogicalPlanBuilder::from(self.plan).alias(alias)?.build()?;
2353 Ok(DataFrame {
2354 session_state: self.session_state,
2355 plan,
2356 projection_requires_validation: self.projection_requires_validation,
2357 })
2358 }
2359
2360 /// Fill null values in specified columns with a given value
2361 /// If no columns are specified (empty vector), applies to all columns
2362 /// Only fills if the value can be cast to the column's type
2363 ///
2364 /// # Arguments
2365 /// * `value` - Value to fill nulls with
2366 /// * `columns` - List of column names to fill. If empty, fills all columns.
2367 ///
2368 /// # Example
2369 /// ```
2370 /// # use datafusion::prelude::*;
2371 /// # use datafusion::error::Result;
2372 /// # use datafusion_common::ScalarValue;
2373 /// # #[tokio::main]
2374 /// # async fn main() -> Result<()> {
2375 /// let ctx = SessionContext::new();
2376 /// let df = ctx
2377 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
2378 /// .await?;
2379 /// // Fill nulls in only columns "a" and "c":
2380 /// let df = df.fill_null(ScalarValue::from(0), vec!["a".to_owned(), "c".to_owned()])?;
2381 /// // Fill nulls across all columns:
2382 /// let df = df.fill_null(ScalarValue::from(0), vec![])?;
2383 /// # Ok(())
2384 /// # }
2385 /// ```
2386 pub fn fill_null(
2387 &self,
2388 value: ScalarValue,
2389 columns: Vec<String>,
2390 ) -> Result<DataFrame> {
2391 let cols = if columns.is_empty() {
2392 self.logical_plan()
2393 .schema()
2394 .fields()
2395 .iter()
2396 .map(|f| f.as_ref().clone())
2397 .collect()
2398 } else {
2399 self.find_columns(&columns)?
2400 };
2401
2402 // Create projections for each column
2403 let projections = self
2404 .logical_plan()
2405 .schema()
2406 .fields()
2407 .iter()
2408 .map(|field| {
2409 if cols.contains(field) {
2410 // Try to cast fill value to column type. If the cast fails, fallback to the original column.
2411 match value.clone().cast_to(field.data_type()) {
2412 Ok(fill_value) => Expr::Alias(Alias {
2413 expr: Box::new(Expr::ScalarFunction(ScalarFunction {
2414 func: coalesce(),
2415 args: vec![col(field.name()), lit(fill_value)],
2416 })),
2417 relation: None,
2418 name: field.name().to_string(),
2419 metadata: None,
2420 }),
2421 Err(_) => col(field.name()),
2422 }
2423 } else {
2424 col(field.name())
2425 }
2426 })
2427 .collect::<Vec<_>>();
2428
2429 self.clone().select(projections)
2430 }
2431
2432 // Helper to find columns from names
2433 fn find_columns(&self, names: &[String]) -> Result<Vec<Field>> {
2434 let schema = self.logical_plan().schema();
2435 names
2436 .iter()
2437 .map(|name| {
2438 schema
2439 .field_with_name(None, name)
2440 .cloned()
2441 .map_err(|_| plan_datafusion_err!("Column '{}' not found", name))
2442 })
2443 .collect()
2444 }
2445
2446 /// Helper for creating DataFrame.
2447 /// # Example
2448 /// ```
2449 /// use arrow::array::{ArrayRef, Int32Array, StringArray};
2450 /// use datafusion::prelude::DataFrame;
2451 /// use std::sync::Arc;
2452 /// let id: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
2453 /// let name: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
2454 /// let df = DataFrame::from_columns(vec![("id", id), ("name", name)]).unwrap();
2455 /// // +----+------+,
2456 /// // | id | name |,
2457 /// // +----+------+,
2458 /// // | 1 | foo |,
2459 /// // | 2 | bar |,
2460 /// // | 3 | baz |,
2461 /// // +----+------+,
2462 /// ```
2463 pub fn from_columns(columns: Vec<(&str, ArrayRef)>) -> Result<Self> {
2464 let fields = columns
2465 .iter()
2466 .map(|(name, array)| Field::new(*name, array.data_type().clone(), true))
2467 .collect::<Vec<_>>();
2468
2469 let arrays = columns
2470 .into_iter()
2471 .map(|(_, array)| array)
2472 .collect::<Vec<_>>();
2473
2474 let schema = Arc::new(Schema::new(fields));
2475 let batch = RecordBatch::try_new(schema, arrays)?;
2476 let ctx = SessionContext::new();
2477 let df = ctx.read_batch(batch)?;
2478 Ok(df)
2479 }
2480}
2481
2482/// Macro for creating DataFrame.
2483/// # Example
2484/// ```
2485/// use datafusion::prelude::dataframe;
2486/// # use datafusion::error::Result;
2487/// # #[tokio::main]
2488/// # async fn main() -> Result<()> {
2489/// let df = dataframe!(
2490/// "id" => [1, 2, 3],
2491/// "name" => ["foo", "bar", "baz"]
2492/// )?;
2493/// df.show().await?;
2494/// // +----+------+,
2495/// // | id | name |,
2496/// // +----+------+,
2497/// // | 1 | foo |,
2498/// // | 2 | bar |,
2499/// // | 3 | baz |,
2500/// // +----+------+,
2501/// let df_empty = dataframe!()?; // empty DataFrame
2502/// assert_eq!(df_empty.schema().fields().len(), 0);
2503/// assert_eq!(df_empty.count().await?, 0);
2504/// # Ok(())
2505/// # }
2506/// ```
2507#[macro_export]
2508macro_rules! dataframe {
2509 () => {{
2510 use std::sync::Arc;
2511
2512 use datafusion::prelude::SessionContext;
2513 use datafusion::arrow::array::RecordBatch;
2514 use datafusion::arrow::datatypes::Schema;
2515
2516 let ctx = SessionContext::new();
2517 let batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
2518 ctx.read_batch(batch)
2519 }};
2520
2521 ($($name:expr => $data:expr),+ $(,)?) => {{
2522 use datafusion::prelude::DataFrame;
2523 use datafusion::common::test_util::IntoArrayRef;
2524
2525 let columns = vec![
2526 $(
2527 ($name, $data.into_array_ref()),
2528 )+
2529 ];
2530
2531 DataFrame::from_columns(columns)
2532 }};
2533}
2534
2535#[derive(Debug)]
2536struct DataFrameTableProvider {
2537 plan: LogicalPlan,
2538 table_type: TableType,
2539}
2540
2541#[async_trait]
2542impl TableProvider for DataFrameTableProvider {
2543 fn as_any(&self) -> &dyn Any {
2544 self
2545 }
2546
2547 fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> {
2548 Some(Cow::Borrowed(&self.plan))
2549 }
2550
2551 fn supports_filters_pushdown(
2552 &self,
2553 filters: &[&Expr],
2554 ) -> Result<Vec<TableProviderFilterPushDown>> {
2555 // A filter is added on the DataFrame when given
2556 Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2557 }
2558
2559 fn schema(&self) -> SchemaRef {
2560 Arc::clone(self.plan.schema().inner())
2561 }
2562
2563 fn table_type(&self) -> TableType {
2564 self.table_type
2565 }
2566
2567 async fn scan(
2568 &self,
2569 state: &dyn Session,
2570 projection: Option<&Vec<usize>>,
2571 filters: &[Expr],
2572 limit: Option<usize>,
2573 ) -> Result<Arc<dyn ExecutionPlan>> {
2574 let mut expr = LogicalPlanBuilder::from(self.plan.clone());
2575 // Add filter when given
2576 let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
2577 if let Some(filter) = filter {
2578 expr = expr.filter(filter)?
2579 }
2580
2581 if let Some(p) = projection {
2582 expr = expr.select(p.iter().copied())?
2583 }
2584
2585 // add a limit if given
2586 if let Some(l) = limit {
2587 expr = expr.limit(0, Some(l))?
2588 }
2589 let plan = expr.build()?;
2590 state.create_physical_plan(&plan).await
2591 }
2592}
2593
2594// see tests in datafusion/core/tests/dataframe/mod.rs:2816