1use std::borrow::Cow;
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use crate::datasource::file_format::file_type_to_format;
25use crate::datasource::listing::ListingTableUrl;
26use crate::datasource::physical_plan::FileSinkConfig;
27use crate::datasource::{source_as_provider, DefaultTableSource};
28use crate::error::{DataFusionError, Result};
29use crate::execution::context::{ExecutionProps, SessionState};
30use crate::logical_expr::utils::generate_sort_key;
31use crate::logical_expr::{
32 Aggregate, EmptyRelation, Join, Projection, Sort, TableScan, Unnest, Values, Window,
33};
34use crate::logical_expr::{
35 Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType, Repartition,
36 UserDefinedLogicalNode,
37};
38use crate::physical_expr::{create_physical_expr, create_physical_exprs};
39use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
40use crate::physical_plan::analyze::AnalyzeExec;
41use crate::physical_plan::explain::ExplainExec;
42use crate::physical_plan::filter::FilterExec;
43use crate::physical_plan::joins::utils as join_utils;
44use crate::physical_plan::joins::{
45 CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
46};
47use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
48use crate::physical_plan::projection::{ProjectionExec, ProjectionExpr};
49use crate::physical_plan::repartition::RepartitionExec;
50use crate::physical_plan::sorts::sort::SortExec;
51use crate::physical_plan::union::UnionExec;
52use crate::physical_plan::unnest::UnnestExec;
53use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
54use crate::physical_plan::{
55 displayable, windows, ExecutionPlan, ExecutionPlanProperties, InputOrderMode,
56 Partitioning, PhysicalExpr, WindowExpr,
57};
58use crate::schema_equivalence::schema_satisfied_by;
59
60use arrow::array::{builder::StringBuilder, RecordBatch};
61use arrow::compute::SortOptions;
62use arrow::datatypes::Schema;
63use datafusion_catalog::ScanArgs;
64use datafusion_common::display::ToStringifiedPlan;
65use datafusion_common::format::ExplainAnalyzeLevel;
66use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
67use datafusion_common::TableReference;
68use datafusion_common::{
69 exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
70 ScalarValue,
71};
72use datafusion_datasource::file_groups::FileGroup;
73use datafusion_datasource::memory::MemorySourceConfig;
74use datafusion_expr::dml::{CopyTo, InsertOp};
75use datafusion_expr::expr::{
76 physical_name, AggregateFunction, AggregateFunctionParams, Alias, GroupingSet,
77 NullTreatment, WindowFunction, WindowFunctionParams,
78};
79use datafusion_expr::expr_rewriter::unnormalize_cols;
80use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
81use datafusion_expr::utils::split_conjunction;
82use datafusion_expr::{
83 Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension,
84 FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType, StringifiedPlan,
85 WindowFrame, WindowFrameBound, WriteOp,
86};
87use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
88use datafusion_physical_expr::expressions::Literal;
89use datafusion_physical_expr::{
90 create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
91};
92use datafusion_physical_optimizer::PhysicalOptimizerRule;
93use datafusion_physical_plan::empty::EmptyExec;
94use datafusion_physical_plan::execution_plan::InvariantLevel;
95use datafusion_physical_plan::joins::PiecewiseMergeJoinExec;
96use datafusion_physical_plan::metrics::MetricType;
97use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
98use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
99use datafusion_physical_plan::unnest::ListUnnest;
100
101use async_trait::async_trait;
102use datafusion_physical_plan::async_func::{AsyncFuncExec, AsyncMapper};
103use futures::{StreamExt, TryStreamExt};
104use itertools::{multiunzip, Itertools};
105use log::debug;
106use tokio::sync::Mutex;
107
108#[async_trait]
111pub trait PhysicalPlanner: Send + Sync {
112 async fn create_physical_plan(
114 &self,
115 logical_plan: &LogicalPlan,
116 session_state: &SessionState,
117 ) -> Result<Arc<dyn ExecutionPlan>>;
118
119 fn create_physical_expr(
126 &self,
127 expr: &Expr,
128 input_dfschema: &DFSchema,
129 session_state: &SessionState,
130 ) -> Result<Arc<dyn PhysicalExpr>>;
131}
132
133#[async_trait]
135pub trait ExtensionPlanner {
136 async fn plan_extension(
147 &self,
148 planner: &dyn PhysicalPlanner,
149 node: &dyn UserDefinedLogicalNode,
150 logical_inputs: &[&LogicalPlan],
151 physical_inputs: &[Arc<dyn ExecutionPlan>],
152 session_state: &SessionState,
153 ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
154}
155
156#[derive(Default)]
175pub struct DefaultPhysicalPlanner {
176 extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
177}
178
179#[async_trait]
180impl PhysicalPlanner for DefaultPhysicalPlanner {
181 async fn create_physical_plan(
183 &self,
184 logical_plan: &LogicalPlan,
185 session_state: &SessionState,
186 ) -> Result<Arc<dyn ExecutionPlan>> {
187 if let Some(plan) = self
188 .handle_explain_or_analyze(logical_plan, session_state)
189 .await?
190 {
191 return Ok(plan);
192 }
193 let plan = self
194 .create_initial_plan(logical_plan, session_state)
195 .await?;
196
197 self.optimize_physical_plan(plan, session_state, |_, _| {})
198 }
199
200 fn create_physical_expr(
207 &self,
208 expr: &Expr,
209 input_dfschema: &DFSchema,
210 session_state: &SessionState,
211 ) -> Result<Arc<dyn PhysicalExpr>> {
212 create_physical_expr(expr, input_dfschema, session_state.execution_props())
213 }
214}
215
216#[derive(Debug)]
217struct ExecutionPlanChild {
218 index: usize,
221 plan: Arc<dyn ExecutionPlan>,
222}
223
224#[derive(Debug)]
225enum NodeState {
226 ZeroOrOneChild,
227 TwoOrMoreChildren(Mutex<Vec<ExecutionPlanChild>>),
231}
232
233enum ChildrenContainer {
236 None,
237 One(Arc<dyn ExecutionPlan>),
238 Multiple(Vec<Arc<dyn ExecutionPlan>>),
239}
240
241impl ChildrenContainer {
242 fn one(self) -> Result<Arc<dyn ExecutionPlan>> {
243 match self {
244 Self::One(p) => Ok(p),
245 _ => internal_err!("More than one child in ChildrenContainer"),
246 }
247 }
248
249 fn two(self) -> Result<[Arc<dyn ExecutionPlan>; 2]> {
250 match self {
251 Self::Multiple(v) if v.len() == 2 => Ok(v.try_into().unwrap()),
252 _ => internal_err!("ChildrenContainer doesn't contain exactly 2 children"),
253 }
254 }
255
256 fn vec(self) -> Vec<Arc<dyn ExecutionPlan>> {
257 match self {
258 Self::None => vec![],
259 Self::One(p) => vec![p],
260 Self::Multiple(v) => v,
261 }
262 }
263}
264
265#[derive(Debug)]
266struct LogicalNode<'a> {
267 node: &'a LogicalPlan,
268 parent_index: Option<usize>,
270 state: NodeState,
271}
272
273impl DefaultPhysicalPlanner {
274 pub fn with_extension_planners(
279 extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
280 ) -> Self {
281 Self { extension_planners }
282 }
283
284 async fn create_initial_plan(
286 &self,
287 logical_plan: &LogicalPlan,
288 session_state: &SessionState,
289 ) -> Result<Arc<dyn ExecutionPlan>> {
290 let mut flat_tree = vec![];
297 let mut dfs_visit_stack = vec![(None, logical_plan)];
298 let mut flat_tree_leaf_indices = vec![];
301 while let Some((parent_index, node)) = dfs_visit_stack.pop() {
302 let current_index = flat_tree.len();
303 dfs_visit_stack
307 .extend(node.inputs().iter().map(|&n| (Some(current_index), n)));
308 let state = match node.inputs().len() {
309 0 => {
310 flat_tree_leaf_indices.push(current_index);
311 NodeState::ZeroOrOneChild
312 }
313 1 => NodeState::ZeroOrOneChild,
314 _ => {
315 let ready_children = Vec::with_capacity(node.inputs().len());
316 let ready_children = Mutex::new(ready_children);
317 NodeState::TwoOrMoreChildren(ready_children)
318 }
319 };
320 let node = LogicalNode {
321 node,
322 parent_index,
323 state,
324 };
325 flat_tree.push(node);
326 }
327 let flat_tree = Arc::new(flat_tree);
328
329 let planning_concurrency = session_state
330 .config_options()
331 .execution
332 .planning_concurrency;
333 let max_concurrency = planning_concurrency.min(flat_tree_leaf_indices.len());
337
338 let tasks = flat_tree_leaf_indices
340 .into_iter()
341 .map(|index| self.task_helper(index, Arc::clone(&flat_tree), session_state));
342 let mut outputs = futures::stream::iter(tasks)
343 .buffer_unordered(max_concurrency)
344 .try_collect::<Vec<_>>()
345 .await?
346 .into_iter()
347 .flatten()
348 .collect::<Vec<_>>();
349 if outputs.len() != 1 {
351 return internal_err!(
352 "Failed to convert LogicalPlan to ExecutionPlan: More than one root detected"
353 );
354 }
355 let plan = outputs.pop().unwrap();
356 Ok(plan)
357 }
358
359 async fn task_helper<'a>(
365 &'a self,
366 leaf_starter_index: usize,
367 flat_tree: Arc<Vec<LogicalNode<'a>>>,
368 session_state: &'a SessionState,
369 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
370 let mut node = flat_tree.get(leaf_starter_index).ok_or_else(|| {
372 internal_datafusion_err!(
373 "Invalid index whilst creating initial physical plan"
374 )
375 })?;
376 let mut plan = self
377 .map_logical_node_to_physical(
378 node.node,
379 session_state,
380 ChildrenContainer::None,
381 )
382 .await?;
383 let mut current_index = leaf_starter_index;
384 while let Some(parent_index) = node.parent_index {
386 node = flat_tree.get(parent_index).ok_or_else(|| {
387 internal_datafusion_err!(
388 "Invalid index whilst creating initial physical plan"
389 )
390 })?;
391 match &node.state {
392 NodeState::ZeroOrOneChild => {
393 plan = self
394 .map_logical_node_to_physical(
395 node.node,
396 session_state,
397 ChildrenContainer::One(plan),
398 )
399 .await?;
400 }
401 NodeState::TwoOrMoreChildren(children) => {
403 let mut children: Vec<ExecutionPlanChild> = {
404 let mut guard = children.lock().await;
405 guard.push(ExecutionPlanChild {
408 index: current_index,
409 plan,
410 });
411 if guard.len() < node.node.inputs().len() {
412 return Ok(None);
415 }
416
417 std::mem::take(guard.as_mut())
422 };
423
424 children.sort_unstable_by_key(|epc| std::cmp::Reverse(epc.index));
430 let children = children.into_iter().map(|epc| epc.plan).collect();
431 let children = ChildrenContainer::Multiple(children);
432 plan = self
433 .map_logical_node_to_physical(node.node, session_state, children)
434 .await?;
435 }
436 }
437 current_index = parent_index;
438 }
439 Ok(Some(plan))
441 }
442
443 async fn map_logical_node_to_physical(
445 &self,
446 node: &LogicalPlan,
447 session_state: &SessionState,
448 children: ChildrenContainer,
449 ) -> Result<Arc<dyn ExecutionPlan>> {
450 let exec_node: Arc<dyn ExecutionPlan> = match node {
451 LogicalPlan::TableScan(TableScan {
453 source,
454 projection,
455 filters,
456 fetch,
457 ..
458 }) => {
459 let source = source_as_provider(source)?;
460 let filters = unnormalize_cols(filters.iter().cloned());
464 let filters_vec = filters.into_iter().collect::<Vec<_>>();
465 let opts = ScanArgs::default()
466 .with_projection(projection.as_deref())
467 .with_filters(Some(&filters_vec))
468 .with_limit(*fetch);
469 let res = source.scan_with_args(session_state, opts).await?;
470 Arc::clone(res.plan())
471 }
472 LogicalPlan::Values(Values { values, schema }) => {
473 let exprs = values
474 .iter()
475 .map(|row| {
476 row.iter()
477 .map(|expr| {
478 self.create_physical_expr(expr, schema, session_state)
479 })
480 .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
481 })
482 .collect::<Result<Vec<_>>>()?;
483 MemorySourceConfig::try_new_as_values(Arc::clone(schema.inner()), exprs)?
484 as _
485 }
486 LogicalPlan::EmptyRelation(EmptyRelation {
487 produce_one_row: false,
488 schema,
489 }) => Arc::new(EmptyExec::new(Arc::clone(schema.inner()))),
490 LogicalPlan::EmptyRelation(EmptyRelation {
491 produce_one_row: true,
492 schema,
493 }) => Arc::new(PlaceholderRowExec::new(Arc::clone(schema.inner()))),
494 LogicalPlan::DescribeTable(DescribeTable {
495 schema,
496 output_schema,
497 }) => {
498 let output_schema = Arc::clone(output_schema.inner());
499 self.plan_describe(Arc::clone(schema), output_schema)?
500 }
501
502 LogicalPlan::Copy(CopyTo {
504 input,
505 output_url,
506 file_type,
507 partition_by,
508 options: source_option_tuples,
509 output_schema: _,
510 }) => {
511 let original_url = output_url.clone();
512 let input_exec = children.one()?;
513 let parsed_url = ListingTableUrl::parse(output_url)?;
514 let object_store_url = parsed_url.object_store();
515
516 let schema = Arc::clone(input.schema().inner());
517
518 let table_partition_cols = partition_by
522 .iter()
523 .map(|s| (s.to_string(), arrow::datatypes::DataType::Null))
524 .collect::<Vec<_>>();
525
526 let keep_partition_by_columns = match source_option_tuples
527 .get("execution.keep_partition_by_columns")
528 .map(|v| v.trim()) {
529 None => session_state.config().options().execution.keep_partition_by_columns,
530 Some("true") => true,
531 Some("false") => false,
532 Some(value) =>
533 return Err(DataFusionError::Configuration(format!("provided value for 'execution.keep_partition_by_columns' was not recognized: \"{value}\""))),
534 };
535
536 let sink_format = file_type_to_format(file_type)?
537 .create(session_state, source_option_tuples)?;
538
539 let file_extension = match sink_format.compression_type() {
541 Some(compression_type) => sink_format
542 .get_ext_with_compression(&compression_type)
543 .unwrap_or_else(|_| sink_format.get_ext()),
544 None => sink_format.get_ext(),
545 };
546
547 let config = FileSinkConfig {
549 original_url,
550 object_store_url,
551 table_paths: vec![parsed_url],
552 file_group: FileGroup::default(),
553 output_schema: schema,
554 table_partition_cols,
555 insert_op: InsertOp::Append,
556 keep_partition_by_columns,
557 file_extension,
558 };
559
560 let ordering = input_exec.properties().output_ordering().cloned();
561
562 sink_format
563 .create_writer_physical_plan(
564 input_exec,
565 session_state,
566 config,
567 ordering.map(Into::into),
568 )
569 .await?
570 }
571 LogicalPlan::Dml(DmlStatement {
572 target,
573 op: WriteOp::Insert(insert_op),
574 ..
575 }) => {
576 if let Some(provider) =
577 target.as_any().downcast_ref::<DefaultTableSource>()
578 {
579 let input_exec = children.one()?;
580 provider
581 .table_provider
582 .insert_into(session_state, input_exec, *insert_op)
583 .await?
584 } else {
585 return exec_err!(
586 "Table source can't be downcasted to DefaultTableSource"
587 );
588 }
589 }
590 LogicalPlan::Window(Window { window_expr, .. }) => {
591 if window_expr.is_empty() {
592 return internal_err!("Impossibly got empty window expression");
593 }
594
595 let input_exec = children.one()?;
596
597 let get_sort_keys = |expr: &Expr| match expr {
598 Expr::WindowFunction(window_fun) => {
599 let WindowFunctionParams {
600 ref partition_by,
601 ref order_by,
602 ..
603 } = &window_fun.as_ref().params;
604 generate_sort_key(partition_by, order_by)
605 }
606 Expr::Alias(Alias { expr, .. }) => {
607 match &**expr {
609 Expr::WindowFunction(window_fun) => {
610 let WindowFunctionParams {
611 ref partition_by,
612 ref order_by,
613 ..
614 } = &window_fun.as_ref().params;
615 generate_sort_key(partition_by, order_by)
616 }
617 _ => unreachable!(),
618 }
619 }
620 _ => unreachable!(),
621 };
622 let sort_keys = get_sort_keys(&window_expr[0])?;
623 if window_expr.len() > 1 {
624 debug_assert!(
625 window_expr[1..]
626 .iter()
627 .all(|expr| get_sort_keys(expr).unwrap() == sort_keys),
628 "all window expressions shall have the same sort keys, as guaranteed by logical planning"
629 );
630 }
631
632 let logical_schema = node.schema();
633 let window_expr = window_expr
634 .iter()
635 .map(|e| {
636 create_window_expr(
637 e,
638 logical_schema,
639 session_state.execution_props(),
640 )
641 })
642 .collect::<Result<Vec<_>>>()?;
643
644 let can_repartition = session_state.config().target_partitions() > 1
645 && session_state.config().repartition_window_functions();
646
647 let uses_bounded_memory =
648 window_expr.iter().all(|e| e.uses_bounded_memory());
649 if uses_bounded_memory {
652 Arc::new(BoundedWindowAggExec::try_new(
653 window_expr,
654 input_exec,
655 InputOrderMode::Sorted,
656 can_repartition,
657 )?)
658 } else {
659 Arc::new(WindowAggExec::try_new(
660 window_expr,
661 input_exec,
662 can_repartition,
663 )?)
664 }
665 }
666 LogicalPlan::Aggregate(Aggregate {
667 input,
668 group_expr,
669 aggr_expr,
670 ..
671 }) => {
672 let options = session_state.config().options();
673 let input_exec = children.one()?;
675 let physical_input_schema = input_exec.schema();
676 let logical_input_schema = input.as_ref().schema();
677 let physical_input_schema_from_logical = logical_input_schema.inner();
678
679 if !options.execution.skip_physical_aggregate_schema_check
680 && !schema_satisfied_by(
681 physical_input_schema_from_logical,
682 &physical_input_schema,
683 )
684 {
685 let mut differences = Vec::new();
686 if physical_input_schema.fields().len()
687 != physical_input_schema_from_logical.fields().len()
688 {
689 differences.push(format!(
690 "Different number of fields: (physical) {} vs (logical) {}",
691 physical_input_schema.fields().len(),
692 physical_input_schema_from_logical.fields().len()
693 ));
694 }
695 for (i, (physical_field, logical_field)) in physical_input_schema
696 .fields()
697 .iter()
698 .zip(physical_input_schema_from_logical.fields())
699 .enumerate()
700 {
701 if physical_field.name() != logical_field.name() {
702 differences.push(format!(
703 "field name at index {}: (physical) {} vs (logical) {}",
704 i,
705 physical_field.name(),
706 logical_field.name()
707 ));
708 }
709 if physical_field.data_type() != logical_field.data_type() {
710 differences.push(format!("field data type at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.data_type(), logical_field.data_type()));
711 }
712 if physical_field.is_nullable() && !logical_field.is_nullable() {
713 differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable()));
714 }
715 }
716 return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences
717 .iter()
718 .map(|s| format!("\n\t- {s}"))
719 .join(""));
720 }
721
722 let groups = self.create_grouping_physical_expr(
723 group_expr,
724 logical_input_schema,
725 &physical_input_schema,
726 session_state,
727 )?;
728
729 let agg_filter = aggr_expr
730 .iter()
731 .map(|e| {
732 create_aggregate_expr_and_maybe_filter(
733 e,
734 logical_input_schema,
735 &physical_input_schema,
736 session_state.execution_props(),
737 )
738 })
739 .collect::<Result<Vec<_>>>()?;
740
741 let (mut aggregates, filters, _order_bys): (Vec<_>, Vec<_>, Vec<_>) =
742 multiunzip(agg_filter);
743
744 let mut async_exprs = Vec::new();
745 let num_input_columns = physical_input_schema.fields().len();
746
747 for agg_func in &mut aggregates {
748 match self.try_plan_async_exprs(
749 num_input_columns,
750 PlannedExprResult::Expr(agg_func.expressions()),
751 physical_input_schema.as_ref(),
752 )? {
753 PlanAsyncExpr::Async(
754 async_map,
755 PlannedExprResult::Expr(physical_exprs),
756 ) => {
757 async_exprs.extend(async_map.async_exprs);
758
759 if let Some(new_agg_func) = agg_func.with_new_expressions(
760 physical_exprs,
761 agg_func
762 .order_bys()
763 .iter()
764 .cloned()
765 .map(|x| x.expr)
766 .collect(),
767 ) {
768 *agg_func = Arc::new(new_agg_func);
769 } else {
770 return internal_err!("Failed to plan async expression");
771 }
772 }
773 PlanAsyncExpr::Sync(PlannedExprResult::Expr(_)) => {
774 }
776 _ => {
777 return internal_err!(
778 "Unexpected result from try_plan_async_exprs"
779 )
780 }
781 }
782 }
783 let input_exec = if !async_exprs.is_empty() {
784 Arc::new(AsyncFuncExec::try_new(async_exprs, input_exec)?)
785 } else {
786 input_exec
787 };
788
789 let initial_aggr = Arc::new(AggregateExec::try_new(
790 AggregateMode::Partial,
791 groups.clone(),
792 aggregates,
793 filters.clone(),
794 input_exec,
795 Arc::clone(&physical_input_schema),
796 )?);
797
798 let can_repartition = !groups.is_empty()
799 && session_state.config().target_partitions() > 1
800 && session_state.config().repartition_aggregations();
801
802 let updated_aggregates = initial_aggr.aggr_expr().to_vec();
808
809 let next_partition_mode = if can_repartition {
810 AggregateMode::FinalPartitioned
812 } else {
813 AggregateMode::Final
816 };
817
818 let final_grouping_set = initial_aggr.group_expr().as_final();
819
820 Arc::new(AggregateExec::try_new(
821 next_partition_mode,
822 final_grouping_set,
823 updated_aggregates,
824 filters,
825 initial_aggr,
826 Arc::clone(&physical_input_schema),
827 )?)
828 }
829 LogicalPlan::Projection(Projection { input, expr, .. }) => self
830 .create_project_physical_exec(
831 session_state,
832 children.one()?,
833 input,
834 expr,
835 )?,
836 LogicalPlan::Filter(Filter {
837 predicate, input, ..
838 }) => {
839 let physical_input = children.one()?;
840 let input_dfschema = input.schema();
841
842 let runtime_expr =
843 self.create_physical_expr(predicate, input_dfschema, session_state)?;
844
845 let input_schema = input.schema();
846 let filter = match self.try_plan_async_exprs(
847 input_schema.fields().len(),
848 PlannedExprResult::Expr(vec![runtime_expr]),
849 input_schema.as_arrow(),
850 )? {
851 PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
852 FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)?
853 }
854 PlanAsyncExpr::Async(
855 async_map,
856 PlannedExprResult::Expr(runtime_expr),
857 ) => {
858 let async_exec = AsyncFuncExec::try_new(
859 async_map.async_exprs,
860 physical_input,
861 )?;
862 FilterExec::try_new(
863 Arc::clone(&runtime_expr[0]),
864 Arc::new(async_exec),
865 )?
866 .with_projection(Some(
869 (0..input.schema().fields().len()).collect(),
870 ))?
871 }
872 _ => {
873 return internal_err!(
874 "Unexpected result from try_plan_async_exprs"
875 )
876 }
877 };
878
879 let selectivity = session_state
880 .config()
881 .options()
882 .optimizer
883 .default_filter_selectivity;
884 Arc::new(filter.with_default_selectivity(selectivity)?)
885 }
886 LogicalPlan::Repartition(Repartition {
887 input,
888 partitioning_scheme,
889 }) => {
890 let physical_input = children.one()?;
891 let input_dfschema = input.as_ref().schema();
892 let physical_partitioning = match partitioning_scheme {
893 LogicalPartitioning::RoundRobinBatch(n) => {
894 Partitioning::RoundRobinBatch(*n)
895 }
896 LogicalPartitioning::Hash(expr, n) => {
897 let runtime_expr = expr
898 .iter()
899 .map(|e| {
900 self.create_physical_expr(
901 e,
902 input_dfschema,
903 session_state,
904 )
905 })
906 .collect::<Result<Vec<_>>>()?;
907 Partitioning::Hash(runtime_expr, *n)
908 }
909 LogicalPartitioning::DistributeBy(_) => {
910 return not_impl_err!(
911 "Physical plan does not support DistributeBy partitioning"
912 );
913 }
914 };
915 Arc::new(RepartitionExec::try_new(
916 physical_input,
917 physical_partitioning,
918 )?)
919 }
920 LogicalPlan::Sort(Sort {
921 expr, input, fetch, ..
922 }) => {
923 let physical_input = children.one()?;
924 let input_dfschema = input.as_ref().schema();
925 let sort_exprs = create_physical_sort_exprs(
926 expr,
927 input_dfschema,
928 session_state.execution_props(),
929 )?;
930 let Some(ordering) = LexOrdering::new(sort_exprs) else {
931 return internal_err!(
932 "SortExec requires at least one sort expression"
933 );
934 };
935 let new_sort = SortExec::new(ordering, physical_input).with_fetch(*fetch);
936 Arc::new(new_sort)
937 }
938 LogicalPlan::Subquery(_) => todo!(),
939 LogicalPlan::SubqueryAlias(_) => children.one()?,
940 LogicalPlan::Limit(limit) => {
941 let input = children.one()?;
942 let SkipType::Literal(skip) = limit.get_skip_type()? else {
943 return not_impl_err!(
944 "Unsupported OFFSET expression: {:?}",
945 limit.skip
946 );
947 };
948 let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
949 return not_impl_err!(
950 "Unsupported LIMIT expression: {:?}",
951 limit.fetch
952 );
953 };
954
955 let input = if input.output_partitioning().partition_count() == 1 {
957 input
958 } else {
959 if let Some(fetch) = fetch {
962 Arc::new(LocalLimitExec::new(input, fetch + skip))
963 } else {
964 input
965 }
966 };
967
968 Arc::new(GlobalLimitExec::new(input, skip, fetch))
969 }
970 LogicalPlan::Unnest(Unnest {
971 list_type_columns,
972 struct_type_columns,
973 schema,
974 options,
975 ..
976 }) => {
977 let input = children.one()?;
978 let schema = Arc::clone(schema.inner());
979 let list_column_indices = list_type_columns
980 .iter()
981 .map(|(index, unnesting)| ListUnnest {
982 index_in_input_schema: *index,
983 depth: unnesting.depth,
984 })
985 .collect();
986 Arc::new(UnnestExec::new(
987 input,
988 list_column_indices,
989 struct_type_columns.clone(),
990 schema,
991 options.clone(),
992 )?)
993 }
994
995 LogicalPlan::Join(Join {
997 left: original_left,
998 right: original_right,
999 on: keys,
1000 filter,
1001 join_type,
1002 null_equality,
1003 schema: join_schema,
1004 ..
1005 }) => {
1006 let [physical_left, physical_right] = children.two()?;
1007
1008 let has_expr_join_key = keys.iter().any(|(l, r)| {
1010 !(matches!(l, Expr::Column(_)) && matches!(r, Expr::Column(_)))
1011 });
1012 let (new_logical, physical_left, physical_right) = if has_expr_join_key {
1013 let (left_keys, right_keys): (Vec<_>, Vec<_>) =
1016 keys.iter().cloned().unzip();
1017
1018 let (left, left_col_keys, left_projected) =
1019 wrap_projection_for_join_if_necessary(
1020 &left_keys,
1021 original_left.as_ref().clone(),
1022 )?;
1023 let (right, right_col_keys, right_projected) =
1024 wrap_projection_for_join_if_necessary(
1025 &right_keys,
1026 original_right.as_ref().clone(),
1027 )?;
1028 let column_on = (left_col_keys, right_col_keys);
1029
1030 let left = Arc::new(left);
1031 let right = Arc::new(right);
1032 let (new_join, requalified) = Join::try_new_with_project_input(
1033 node,
1034 Arc::clone(&left),
1035 Arc::clone(&right),
1036 column_on,
1037 )?;
1038
1039 let new_join = LogicalPlan::Join(new_join);
1040
1041 let physical_left = match (left_projected, left.as_ref()) {
1044 (
1046 true,
1047 LogicalPlan::Projection(Projection { input, expr, .. }),
1048 ) => self.create_project_physical_exec(
1049 session_state,
1050 physical_left,
1051 input,
1052 expr,
1053 )?,
1054 _ => physical_left,
1055 };
1056 let physical_right = match (right_projected, right.as_ref()) {
1057 (
1059 true,
1060 LogicalPlan::Projection(Projection { input, expr, .. }),
1061 ) => self.create_project_physical_exec(
1062 session_state,
1063 physical_right,
1064 input,
1065 expr,
1066 )?,
1067 _ => physical_right,
1068 };
1069
1070 if left_projected || right_projected {
1072 let qualified_join_schema = if requalified {
1077 Arc::new(qualify_join_schema_sides(
1078 join_schema,
1079 original_left,
1080 original_right,
1081 )?)
1082 } else {
1083 Arc::clone(join_schema)
1084 };
1085
1086 let final_join_result = qualified_join_schema
1087 .iter()
1088 .map(Expr::from)
1089 .collect::<Vec<_>>();
1090 let projection = LogicalPlan::Projection(Projection::try_new(
1091 final_join_result,
1092 Arc::new(new_join),
1093 )?);
1094 (Cow::Owned(projection), physical_left, physical_right)
1096 } else {
1097 (Cow::Owned(new_join), physical_left, physical_right)
1099 }
1100 } else {
1101 (Cow::Borrowed(node), physical_left, physical_right)
1103 };
1104
1105 let (left, right, keys, new_project) = match new_logical.as_ref() {
1107 LogicalPlan::Projection(Projection { input, expr, .. }) => {
1108 if let LogicalPlan::Join(Join {
1109 left, right, on, ..
1110 }) = input.as_ref()
1111 {
1112 (left, right, on, Some((input, expr)))
1113 } else {
1114 unreachable!()
1115 }
1116 }
1117 LogicalPlan::Join(Join {
1118 left, right, on, ..
1119 }) => (left, right, on, None),
1120 _ => unreachable!(),
1122 };
1123
1124 let left_df_schema = left.schema();
1126 let right_df_schema = right.schema();
1127 let execution_props = session_state.execution_props();
1128 let join_on = keys
1129 .iter()
1130 .map(|(l, r)| {
1131 let l = create_physical_expr(l, left_df_schema, execution_props)?;
1132 let r =
1133 create_physical_expr(r, right_df_schema, execution_props)?;
1134 Ok((l, r))
1135 })
1136 .collect::<Result<join_utils::JoinOn>>()?;
1137
1138 let mut num_range_filters = 0;
1140 let mut range_filters: Vec<Expr> = Vec::new();
1141 let mut total_filters = 0;
1142
1143 let join_filter = match filter {
1144 Some(expr) => {
1145 let split_expr = split_conjunction(expr);
1146 for expr in split_expr.iter() {
1147 match *expr {
1148 Expr::BinaryExpr(BinaryExpr {
1149 left: _,
1150 right: _,
1151 op,
1152 }) => {
1153 if matches!(
1154 op,
1155 Operator::Lt
1156 | Operator::LtEq
1157 | Operator::Gt
1158 | Operator::GtEq
1159 ) {
1160 range_filters.push((**expr).clone());
1161 num_range_filters += 1;
1162 }
1163 total_filters += 1;
1164 }
1165 _ => {
1169 total_filters += 1;
1170 }
1171 }
1172 }
1173
1174 let cols = expr.column_refs();
1176
1177 let left_field_indices = cols
1179 .iter()
1180 .filter_map(|c| left_df_schema.index_of_column(c).ok())
1181 .sorted()
1182 .collect::<Vec<_>>();
1183 let right_field_indices = cols
1184 .iter()
1185 .filter_map(|c| right_df_schema.index_of_column(c).ok())
1186 .sorted()
1187 .collect::<Vec<_>>();
1188
1189 let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) =
1191 left_field_indices
1192 .clone()
1193 .into_iter()
1194 .map(|i| {
1195 (
1196 left_df_schema.qualified_field(i),
1197 physical_left.schema().field(i).clone(),
1198 )
1199 })
1200 .chain(right_field_indices.clone().into_iter().map(|i| {
1201 (
1202 right_df_schema.qualified_field(i),
1203 physical_right.schema().field(i).clone(),
1204 )
1205 }))
1206 .unzip();
1207 let filter_df_fields = filter_df_fields
1208 .into_iter()
1209 .map(|(qualifier, field)| {
1210 (qualifier.cloned(), Arc::new(field.clone()))
1211 })
1212 .collect();
1213
1214 let metadata: HashMap<_, _> = left_df_schema
1215 .metadata()
1216 .clone()
1217 .into_iter()
1218 .chain(right_df_schema.metadata().clone())
1219 .collect();
1220
1221 let filter_df_schema = DFSchema::new_with_metadata(
1224 filter_df_fields,
1225 metadata.clone(),
1226 )?;
1227 let filter_schema =
1228 Schema::new_with_metadata(filter_fields, metadata);
1229
1230 let filter_expr = create_physical_expr(
1231 expr,
1232 &filter_df_schema,
1233 session_state.execution_props(),
1234 )?;
1235 let column_indices = join_utils::JoinFilter::build_column_indices(
1236 left_field_indices,
1237 right_field_indices,
1238 );
1239
1240 Some(join_utils::JoinFilter::new(
1241 filter_expr,
1242 column_indices,
1243 Arc::new(filter_schema),
1244 ))
1245 }
1246 _ => None,
1247 };
1248
1249 let prefer_hash_join =
1250 session_state.config_options().optimizer.prefer_hash_join;
1251
1252 let join: Arc<dyn ExecutionPlan> = if join_on.is_empty() {
1254 if join_filter.is_none() && matches!(join_type, JoinType::Inner) {
1255 Arc::new(CrossJoinExec::new(physical_left, physical_right))
1257 } else if num_range_filters == 1
1258 && total_filters == 1
1259 && !matches!(
1260 join_type,
1261 JoinType::LeftSemi
1262 | JoinType::RightSemi
1263 | JoinType::LeftAnti
1264 | JoinType::RightAnti
1265 | JoinType::LeftMark
1266 | JoinType::RightMark
1267 )
1268 && session_state
1269 .config_options()
1270 .optimizer
1271 .enable_piecewise_merge_join
1272 {
1273 let Expr::BinaryExpr(be) = &range_filters[0] else {
1274 return plan_err!(
1275 "Unsupported expression for PWMJ: Expected `Expr::BinaryExpr`"
1276 );
1277 };
1278
1279 let mut op = be.op;
1280 if !matches!(
1281 op,
1282 Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq
1283 ) {
1284 return plan_err!(
1285 "Unsupported operator for PWMJ: {:?}. Expected one of <, <=, >, >=",
1286 op
1287 );
1288 }
1289
1290 fn reverse_ineq(op: Operator) -> Operator {
1291 match op {
1292 Operator::Lt => Operator::Gt,
1293 Operator::LtEq => Operator::GtEq,
1294 Operator::Gt => Operator::Lt,
1295 Operator::GtEq => Operator::LtEq,
1296 _ => op,
1297 }
1298 }
1299
1300 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1301 enum Side {
1302 Left,
1303 Right,
1304 Both,
1305 }
1306
1307 let side_of = |e: &Expr| -> Result<Side> {
1308 let cols = e.column_refs();
1309 let any_left = cols
1310 .iter()
1311 .any(|c| left_df_schema.index_of_column(c).is_ok());
1312 let any_right = cols
1313 .iter()
1314 .any(|c| right_df_schema.index_of_column(c).is_ok());
1315
1316 Ok(match (any_left, any_right) {
1317 (true, false) => Side::Left,
1318 (false, true) => Side::Right,
1319 (true, true) => Side::Both,
1320 _ => unreachable!(),
1321 })
1322 };
1323
1324 let mut lhs_logical = &be.left;
1325 let mut rhs_logical = &be.right;
1326
1327 let left_side = side_of(lhs_logical)?;
1328 let right_side = side_of(rhs_logical)?;
1329 if matches!(left_side, Side::Both)
1330 || matches!(right_side, Side::Both)
1331 {
1332 return Ok(Arc::new(NestedLoopJoinExec::try_new(
1333 physical_left,
1334 physical_right,
1335 join_filter,
1336 join_type,
1337 None,
1338 )?));
1339 }
1340
1341 if left_side == Side::Right && right_side == Side::Left {
1342 std::mem::swap(&mut lhs_logical, &mut rhs_logical);
1343 op = reverse_ineq(op);
1344 } else if !(left_side == Side::Left && right_side == Side::Right)
1345 {
1346 return plan_err!(
1347 "Unsupported operator for PWMJ: {:?}. Expected one of <, <=, >, >=",
1348 op
1349 );
1350 }
1351
1352 let on_left = create_physical_expr(
1353 lhs_logical,
1354 left_df_schema,
1355 session_state.execution_props(),
1356 )?;
1357 let on_right = create_physical_expr(
1358 rhs_logical,
1359 right_df_schema,
1360 session_state.execution_props(),
1361 )?;
1362
1363 Arc::new(PiecewiseMergeJoinExec::try_new(
1364 physical_left,
1365 physical_right,
1366 (on_left, on_right),
1367 op,
1368 *join_type,
1369 session_state.config().target_partitions(),
1370 )?)
1371 } else {
1372 Arc::new(NestedLoopJoinExec::try_new(
1374 physical_left,
1375 physical_right,
1376 join_filter,
1377 join_type,
1378 None,
1379 )?)
1380 }
1381 } else if session_state.config().target_partitions() > 1
1382 && session_state.config().repartition_joins()
1383 && !prefer_hash_join
1384 {
1385 let join_on_len = join_on.len();
1387 Arc::new(SortMergeJoinExec::try_new(
1388 physical_left,
1389 physical_right,
1390 join_on,
1391 join_filter,
1392 *join_type,
1393 vec![SortOptions::default(); join_on_len],
1394 *null_equality,
1395 )?)
1396 } else if session_state.config().target_partitions() > 1
1397 && session_state.config().repartition_joins()
1398 && prefer_hash_join
1399 {
1400 Arc::new(HashJoinExec::try_new(
1401 physical_left,
1402 physical_right,
1403 join_on,
1404 join_filter,
1405 join_type,
1406 None,
1407 PartitionMode::Auto,
1408 *null_equality,
1409 )?)
1410 } else {
1411 Arc::new(HashJoinExec::try_new(
1412 physical_left,
1413 physical_right,
1414 join_on,
1415 join_filter,
1416 join_type,
1417 None,
1418 PartitionMode::CollectLeft,
1419 *null_equality,
1420 )?)
1421 };
1422
1423 if let Some((input, expr)) = new_project {
1426 self.create_project_physical_exec(session_state, join, input, expr)?
1427 } else {
1428 join
1429 }
1430 }
1431 LogicalPlan::RecursiveQuery(RecursiveQuery {
1432 name, is_distinct, ..
1433 }) => {
1434 let [static_term, recursive_term] = children.two()?;
1435 Arc::new(RecursiveQueryExec::try_new(
1436 name.clone(),
1437 static_term,
1438 recursive_term,
1439 *is_distinct,
1440 )?)
1441 }
1442
1443 LogicalPlan::Union(_) => UnionExec::try_new(children.vec())?,
1445 LogicalPlan::Extension(Extension { node }) => {
1446 let mut maybe_plan = None;
1447 let children = children.vec();
1448 for planner in &self.extension_planners {
1449 if maybe_plan.is_some() {
1450 break;
1451 }
1452
1453 let logical_input = node.inputs();
1454 maybe_plan = planner
1455 .plan_extension(
1456 self,
1457 node.as_ref(),
1458 &logical_input,
1459 &children,
1460 session_state,
1461 )
1462 .await?;
1463 }
1464
1465 let plan = match maybe_plan {
1466 Some(v) => Ok(v),
1467 _ => plan_err!("No installed planner was able to convert the custom node to an execution plan: {:?}", node)
1468 }?;
1469
1470 if !node.schema().matches_arrow_schema(&plan.schema()) {
1474 return plan_err!(
1475 "Extension planner for {:?} created an ExecutionPlan with mismatched schema. \
1476 LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
1477 node, node.schema(), plan.schema()
1478 );
1479 } else {
1480 plan
1481 }
1482 }
1483
1484 LogicalPlan::Statement(statement) => {
1486 let name = statement.name();
1488 return not_impl_err!("Unsupported logical plan: Statement({name})");
1489 }
1490 LogicalPlan::Dml(dml) => {
1491 return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op);
1493 }
1494 LogicalPlan::Ddl(ddl) => {
1495 let name = ddl.name();
1500 return not_impl_err!("Unsupported logical plan: {name}");
1501 }
1502 LogicalPlan::Explain(_) => {
1503 return internal_err!(
1504 "Unsupported logical plan: Explain must be root of the plan"
1505 )
1506 }
1507 LogicalPlan::Distinct(_) => {
1508 return internal_err!(
1509 "Unsupported logical plan: Distinct should be replaced to Aggregate"
1510 )
1511 }
1512 LogicalPlan::Analyze(_) => {
1513 return internal_err!(
1514 "Unsupported logical plan: Analyze must be root of the plan"
1515 )
1516 }
1517 };
1518 Ok(exec_node)
1519 }
1520
1521 fn create_grouping_physical_expr(
1522 &self,
1523 group_expr: &[Expr],
1524 input_dfschema: &DFSchema,
1525 input_schema: &Schema,
1526 session_state: &SessionState,
1527 ) -> Result<PhysicalGroupBy> {
1528 if group_expr.len() == 1 {
1529 match &group_expr[0] {
1530 Expr::GroupingSet(GroupingSet::GroupingSets(grouping_sets)) => {
1531 merge_grouping_set_physical_expr(
1532 grouping_sets,
1533 input_dfschema,
1534 input_schema,
1535 session_state,
1536 )
1537 }
1538 Expr::GroupingSet(GroupingSet::Cube(exprs)) => create_cube_physical_expr(
1539 exprs,
1540 input_dfschema,
1541 input_schema,
1542 session_state,
1543 ),
1544 Expr::GroupingSet(GroupingSet::Rollup(exprs)) => {
1545 create_rollup_physical_expr(
1546 exprs,
1547 input_dfschema,
1548 input_schema,
1549 session_state,
1550 )
1551 }
1552 expr => Ok(PhysicalGroupBy::new_single(vec![tuple_err((
1553 self.create_physical_expr(expr, input_dfschema, session_state),
1554 physical_name(expr),
1555 ))?])),
1556 }
1557 } else if group_expr.is_empty() {
1558 Ok(PhysicalGroupBy::new(vec![], vec![], vec![]))
1560 } else {
1561 Ok(PhysicalGroupBy::new_single(
1562 group_expr
1563 .iter()
1564 .map(|e| {
1565 tuple_err((
1566 self.create_physical_expr(e, input_dfschema, session_state),
1567 physical_name(e),
1568 ))
1569 })
1570 .collect::<Result<Vec<_>>>()?,
1571 ))
1572 }
1573 }
1574}
1575
1576fn merge_grouping_set_physical_expr(
1587 grouping_sets: &[Vec<Expr>],
1588 input_dfschema: &DFSchema,
1589 input_schema: &Schema,
1590 session_state: &SessionState,
1591) -> Result<PhysicalGroupBy> {
1592 let num_groups = grouping_sets.len();
1593 let mut all_exprs: Vec<Expr> = vec![];
1594 let mut grouping_set_expr: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![];
1595 let mut null_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![];
1596
1597 for expr in grouping_sets.iter().flatten() {
1598 if !all_exprs.contains(expr) {
1599 all_exprs.push(expr.clone());
1600
1601 grouping_set_expr.push(get_physical_expr_pair(
1602 expr,
1603 input_dfschema,
1604 session_state,
1605 )?);
1606
1607 null_exprs.push(get_null_physical_expr_pair(
1608 expr,
1609 input_dfschema,
1610 input_schema,
1611 session_state,
1612 )?);
1613 }
1614 }
1615
1616 let mut merged_sets: Vec<Vec<bool>> = Vec::with_capacity(num_groups);
1617
1618 for expr_group in grouping_sets.iter() {
1619 let group: Vec<bool> = all_exprs
1620 .iter()
1621 .map(|expr| !expr_group.contains(expr))
1622 .collect();
1623
1624 merged_sets.push(group)
1625 }
1626
1627 Ok(PhysicalGroupBy::new(
1628 grouping_set_expr,
1629 null_exprs,
1630 merged_sets,
1631 ))
1632}
1633
1634fn create_cube_physical_expr(
1637 exprs: &[Expr],
1638 input_dfschema: &DFSchema,
1639 input_schema: &Schema,
1640 session_state: &SessionState,
1641) -> Result<PhysicalGroupBy> {
1642 let num_of_exprs = exprs.len();
1643 let num_groups = num_of_exprs * num_of_exprs;
1644
1645 let mut null_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
1646 Vec::with_capacity(num_of_exprs);
1647 let mut all_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
1648 Vec::with_capacity(num_of_exprs);
1649
1650 for expr in exprs {
1651 null_exprs.push(get_null_physical_expr_pair(
1652 expr,
1653 input_dfschema,
1654 input_schema,
1655 session_state,
1656 )?);
1657
1658 all_exprs.push(get_physical_expr_pair(expr, input_dfschema, session_state)?)
1659 }
1660
1661 let mut groups: Vec<Vec<bool>> = Vec::with_capacity(num_groups);
1662
1663 groups.push(vec![false; num_of_exprs]);
1664
1665 for null_count in 1..=num_of_exprs {
1666 for null_idx in (0..num_of_exprs).combinations(null_count) {
1667 let mut next_group: Vec<bool> = vec![false; num_of_exprs];
1668 null_idx.into_iter().for_each(|i| next_group[i] = true);
1669 groups.push(next_group);
1670 }
1671 }
1672
1673 Ok(PhysicalGroupBy::new(all_exprs, null_exprs, groups))
1674}
1675
1676fn create_rollup_physical_expr(
1679 exprs: &[Expr],
1680 input_dfschema: &DFSchema,
1681 input_schema: &Schema,
1682 session_state: &SessionState,
1683) -> Result<PhysicalGroupBy> {
1684 let num_of_exprs = exprs.len();
1685
1686 let mut null_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
1687 Vec::with_capacity(num_of_exprs);
1688 let mut all_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
1689 Vec::with_capacity(num_of_exprs);
1690
1691 let mut groups: Vec<Vec<bool>> = Vec::with_capacity(num_of_exprs + 1);
1692
1693 for expr in exprs {
1694 null_exprs.push(get_null_physical_expr_pair(
1695 expr,
1696 input_dfschema,
1697 input_schema,
1698 session_state,
1699 )?);
1700
1701 all_exprs.push(get_physical_expr_pair(expr, input_dfschema, session_state)?)
1702 }
1703
1704 for total in 0..=num_of_exprs {
1705 let mut group: Vec<bool> = Vec::with_capacity(num_of_exprs);
1706
1707 for index in 0..num_of_exprs {
1708 if index < total {
1709 group.push(false);
1710 } else {
1711 group.push(true);
1712 }
1713 }
1714
1715 groups.push(group)
1716 }
1717
1718 Ok(PhysicalGroupBy::new(all_exprs, null_exprs, groups))
1719}
1720
1721fn get_null_physical_expr_pair(
1723 expr: &Expr,
1724 input_dfschema: &DFSchema,
1725 input_schema: &Schema,
1726 session_state: &SessionState,
1727) -> Result<(Arc<dyn PhysicalExpr>, String)> {
1728 let physical_expr =
1729 create_physical_expr(expr, input_dfschema, session_state.execution_props())?;
1730 let physical_name = physical_name(&expr.clone())?;
1731
1732 let data_type = physical_expr.data_type(input_schema)?;
1733 let null_value: ScalarValue = (&data_type).try_into()?;
1734
1735 let null_value = Literal::new(null_value);
1736 Ok((Arc::new(null_value), physical_name))
1737}
1738
1739fn qualify_join_schema_sides(
1746 join_schema: &DFSchema,
1747 left: &LogicalPlan,
1748 right: &LogicalPlan,
1749) -> Result<DFSchema> {
1750 let left_fields = left.schema().fields();
1751 let right_fields = right.schema().fields();
1752 let join_fields = join_schema.fields();
1753
1754 if join_fields.len() != left_fields.len() + right_fields.len() {
1756 return internal_err!(
1757 "Join schema field count must match left and right field count."
1758 );
1759 }
1760
1761 for (i, (field, expected)) in join_fields
1763 .iter()
1764 .zip(left_fields.iter().chain(right_fields.iter()))
1765 .enumerate()
1766 {
1767 if field.name() != expected.name() {
1768 return internal_err!(
1769 "Field name mismatch at index {}: expected '{}', found '{}'",
1770 i,
1771 expected.name(),
1772 field.name()
1773 );
1774 }
1775 }
1776
1777 let qualifiers = join_fields
1779 .iter()
1780 .enumerate()
1781 .map(|(i, _)| {
1782 if i < left_fields.len() {
1783 Some(TableReference::Bare {
1784 table: Arc::from("left"),
1785 })
1786 } else {
1787 Some(TableReference::Bare {
1788 table: Arc::from("right"),
1789 })
1790 }
1791 })
1792 .collect();
1793
1794 join_schema.with_field_specific_qualified_schema(qualifiers)
1795}
1796
1797fn get_physical_expr_pair(
1798 expr: &Expr,
1799 input_dfschema: &DFSchema,
1800 session_state: &SessionState,
1801) -> Result<(Arc<dyn PhysicalExpr>, String)> {
1802 let physical_expr =
1803 create_physical_expr(expr, input_dfschema, session_state.execution_props())?;
1804 let physical_name = physical_name(expr)?;
1805 Ok((physical_expr, physical_name))
1806}
1807
1808pub fn is_window_frame_bound_valid(window_frame: &WindowFrame) -> bool {
1814 match (&window_frame.start_bound, &window_frame.end_bound) {
1815 (WindowFrameBound::Following(_), WindowFrameBound::Preceding(_))
1816 | (WindowFrameBound::Following(_), WindowFrameBound::CurrentRow)
1817 | (WindowFrameBound::CurrentRow, WindowFrameBound::Preceding(_)) => false,
1818 (WindowFrameBound::Preceding(lhs), WindowFrameBound::Preceding(rhs)) => {
1819 !rhs.is_null() && (lhs.is_null() || (lhs >= rhs))
1820 }
1821 (WindowFrameBound::Following(lhs), WindowFrameBound::Following(rhs)) => {
1822 !lhs.is_null() && (rhs.is_null() || (lhs <= rhs))
1823 }
1824 _ => true,
1825 }
1826}
1827
1828pub fn create_window_expr_with_name(
1830 e: &Expr,
1831 name: impl Into<String>,
1832 logical_schema: &DFSchema,
1833 execution_props: &ExecutionProps,
1834) -> Result<Arc<dyn WindowExpr>> {
1835 let name = name.into();
1836 let physical_schema = Arc::clone(logical_schema.inner());
1837 match e {
1838 Expr::WindowFunction(window_fun) => {
1839 let WindowFunction {
1840 fun,
1841 params:
1842 WindowFunctionParams {
1843 args,
1844 partition_by,
1845 order_by,
1846 window_frame,
1847 null_treatment,
1848 distinct,
1849 filter,
1850 },
1851 } = window_fun.as_ref();
1852 let physical_args =
1853 create_physical_exprs(args, logical_schema, execution_props)?;
1854 let partition_by =
1855 create_physical_exprs(partition_by, logical_schema, execution_props)?;
1856 let order_by =
1857 create_physical_sort_exprs(order_by, logical_schema, execution_props)?;
1858
1859 if !is_window_frame_bound_valid(window_frame) {
1860 return plan_err!(
1861 "Invalid window frame: start bound ({}) cannot be larger than end bound ({})",
1862 window_frame.start_bound, window_frame.end_bound
1863 );
1864 }
1865
1866 let window_frame = Arc::new(window_frame.clone());
1867 let ignore_nulls = null_treatment.unwrap_or(NullTreatment::RespectNulls)
1868 == NullTreatment::IgnoreNulls;
1869 let physical_filter = filter
1870 .as_ref()
1871 .map(|f| create_physical_expr(f, logical_schema, execution_props))
1872 .transpose()?;
1873
1874 windows::create_window_expr(
1875 fun,
1876 name,
1877 &physical_args,
1878 &partition_by,
1879 &order_by,
1880 window_frame,
1881 physical_schema,
1882 ignore_nulls,
1883 *distinct,
1884 physical_filter,
1885 )
1886 }
1887 other => plan_err!("Invalid window expression '{other:?}'"),
1888 }
1889}
1890
1891pub fn create_window_expr(
1893 e: &Expr,
1894 logical_schema: &DFSchema,
1895 execution_props: &ExecutionProps,
1896) -> Result<Arc<dyn WindowExpr>> {
1897 let (name, e) = match e {
1899 Expr::Alias(Alias { expr, name, .. }) => (name.clone(), expr.as_ref()),
1900 _ => (e.schema_name().to_string(), e),
1901 };
1902 create_window_expr_with_name(e, name, logical_schema, execution_props)
1903}
1904
1905type AggregateExprWithOptionalArgs = (
1906 Arc<AggregateFunctionExpr>,
1907 Option<Arc<dyn PhysicalExpr>>,
1909 Vec<PhysicalSortExpr>,
1911);
1912
1913pub fn create_aggregate_expr_with_name_and_maybe_filter(
1915 e: &Expr,
1916 name: Option<String>,
1917 human_displan: String,
1918 logical_input_schema: &DFSchema,
1919 physical_input_schema: &Schema,
1920 execution_props: &ExecutionProps,
1921) -> Result<AggregateExprWithOptionalArgs> {
1922 match e {
1923 Expr::AggregateFunction(AggregateFunction {
1924 func,
1925 params:
1926 AggregateFunctionParams {
1927 args,
1928 distinct,
1929 filter,
1930 order_by,
1931 null_treatment,
1932 },
1933 }) => {
1934 let name = if let Some(name) = name {
1935 name
1936 } else {
1937 physical_name(e)?
1938 };
1939
1940 let physical_args =
1941 create_physical_exprs(args, logical_input_schema, execution_props)?;
1942 let filter = match filter {
1943 Some(e) => Some(create_physical_expr(
1944 e,
1945 logical_input_schema,
1946 execution_props,
1947 )?),
1948 None => None,
1949 };
1950
1951 let ignore_nulls = null_treatment.unwrap_or(NullTreatment::RespectNulls)
1952 == NullTreatment::IgnoreNulls;
1953
1954 let (agg_expr, filter, order_bys) = {
1955 let order_bys = create_physical_sort_exprs(
1956 order_by,
1957 logical_input_schema,
1958 execution_props,
1959 )?;
1960
1961 let agg_expr =
1962 AggregateExprBuilder::new(func.to_owned(), physical_args.to_vec())
1963 .order_by(order_bys.clone())
1964 .schema(Arc::new(physical_input_schema.to_owned()))
1965 .alias(name)
1966 .human_display(human_displan)
1967 .with_ignore_nulls(ignore_nulls)
1968 .with_distinct(*distinct)
1969 .build()
1970 .map(Arc::new)?;
1971
1972 (agg_expr, filter, order_bys)
1973 };
1974
1975 Ok((agg_expr, filter, order_bys))
1976 }
1977 other => internal_err!("Invalid aggregate expression '{other:?}'"),
1978 }
1979}
1980
1981pub fn create_aggregate_expr_and_maybe_filter(
1983 e: &Expr,
1984 logical_input_schema: &DFSchema,
1985 physical_input_schema: &Schema,
1986 execution_props: &ExecutionProps,
1987) -> Result<AggregateExprWithOptionalArgs> {
1988 let (name, human_display, e) = match e {
1992 Expr::Alias(Alias { name, .. }) => {
1993 let unaliased = e.clone().unalias_nested().data;
1994 (Some(name.clone()), e.human_display().to_string(), unaliased)
1995 }
1996 Expr::AggregateFunction(_) => (
1997 Some(e.schema_name().to_string()),
1998 e.human_display().to_string(),
1999 e.clone(),
2000 ),
2001 _ => (None, String::default(), e.clone()),
2002 };
2003
2004 create_aggregate_expr_with_name_and_maybe_filter(
2005 &e,
2006 name,
2007 human_display,
2008 logical_input_schema,
2009 physical_input_schema,
2010 execution_props,
2011 )
2012}
2013
2014impl DefaultPhysicalPlanner {
2015 async fn handle_explain_or_analyze(
2021 &self,
2022 logical_plan: &LogicalPlan,
2023 session_state: &SessionState,
2024 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
2025 let execution_plan = match logical_plan {
2026 LogicalPlan::Explain(e) => self.handle_explain(e, session_state).await?,
2027 LogicalPlan::Analyze(a) => self.handle_analyze(a, session_state).await?,
2028 _ => return Ok(None),
2029 };
2030 Ok(Some(execution_plan))
2031 }
2032
2033 async fn handle_explain(
2035 &self,
2036 e: &Explain,
2037 session_state: &SessionState,
2038 ) -> Result<Arc<dyn ExecutionPlan>> {
2039 use PlanType::*;
2040 let mut stringified_plans = vec![];
2041
2042 let config = &session_state.config_options().explain;
2043 let explain_format = &e.explain_format;
2044
2045 if !e.logical_optimization_succeeded {
2046 return Ok(Arc::new(ExplainExec::new(
2047 Arc::clone(e.schema.inner()),
2048 e.stringified_plans.clone(),
2049 true,
2050 )));
2051 }
2052
2053 match explain_format {
2054 ExplainFormat::Indent => { }
2055 ExplainFormat::Tree => {
2056 let physical_plan = self
2058 .create_initial_plan(e.plan.as_ref(), session_state)
2059 .await?;
2060
2061 let optimized_plan = self.optimize_physical_plan(
2062 physical_plan,
2063 session_state,
2064 |_plan, _optimizer| {},
2065 )?;
2066
2067 stringified_plans.push(StringifiedPlan::new(
2068 FinalPhysicalPlan,
2069 displayable(optimized_plan.as_ref())
2070 .set_tree_maximum_render_width(config.tree_maximum_render_width)
2071 .tree_render()
2072 .to_string(),
2073 ));
2074 }
2075 ExplainFormat::PostgresJSON => {
2076 stringified_plans.push(StringifiedPlan::new(
2077 FinalLogicalPlan,
2078 e.plan.display_pg_json().to_string(),
2079 ));
2080 }
2081 ExplainFormat::Graphviz => {
2082 stringified_plans.push(StringifiedPlan::new(
2083 FinalLogicalPlan,
2084 e.plan.display_graphviz().to_string(),
2085 ));
2086 }
2087 };
2088
2089 if !stringified_plans.is_empty() {
2090 return Ok(Arc::new(ExplainExec::new(
2091 Arc::clone(e.schema.inner()),
2092 stringified_plans,
2093 e.verbose,
2094 )));
2095 }
2096
2097 if !config.physical_plan_only {
2100 stringified_plans.clone_from(&e.stringified_plans);
2101 if e.logical_optimization_succeeded {
2102 stringified_plans.push(e.plan.to_stringified(FinalLogicalPlan));
2103 }
2104 }
2105
2106 if !config.logical_plan_only && e.logical_optimization_succeeded {
2107 match self
2108 .create_initial_plan(e.plan.as_ref(), session_state)
2109 .await
2110 {
2111 Ok(input) => {
2112 stringified_plans.push(StringifiedPlan::new(
2114 InitialPhysicalPlan,
2115 displayable(input.as_ref())
2116 .set_show_statistics(config.show_statistics)
2117 .set_show_schema(config.show_schema)
2118 .indent(e.verbose)
2119 .to_string(),
2120 ));
2121
2122 if e.verbose {
2125 if !config.show_statistics {
2126 stringified_plans.push(StringifiedPlan::new(
2127 InitialPhysicalPlanWithStats,
2128 displayable(input.as_ref())
2129 .set_show_statistics(true)
2130 .indent(e.verbose)
2131 .to_string(),
2132 ));
2133 }
2134 if !config.show_schema {
2135 stringified_plans.push(StringifiedPlan::new(
2136 InitialPhysicalPlanWithSchema,
2137 displayable(input.as_ref())
2138 .set_show_schema(true)
2139 .indent(e.verbose)
2140 .to_string(),
2141 ));
2142 }
2143 }
2144
2145 let optimized_plan = self.optimize_physical_plan(
2146 input,
2147 session_state,
2148 |plan, optimizer| {
2149 let optimizer_name = optimizer.name().to_string();
2150 let plan_type = OptimizedPhysicalPlan { optimizer_name };
2151 stringified_plans.push(StringifiedPlan::new(
2152 plan_type,
2153 displayable(plan)
2154 .set_show_statistics(config.show_statistics)
2155 .set_show_schema(config.show_schema)
2156 .indent(e.verbose)
2157 .to_string(),
2158 ));
2159 },
2160 );
2161 match optimized_plan {
2162 Ok(input) => {
2163 stringified_plans.push(StringifiedPlan::new(
2165 FinalPhysicalPlan,
2166 displayable(input.as_ref())
2167 .set_show_statistics(config.show_statistics)
2168 .set_show_schema(config.show_schema)
2169 .indent(e.verbose)
2170 .to_string(),
2171 ));
2172
2173 if e.verbose {
2176 if !config.show_statistics {
2177 stringified_plans.push(StringifiedPlan::new(
2178 FinalPhysicalPlanWithStats,
2179 displayable(input.as_ref())
2180 .set_show_statistics(true)
2181 .indent(e.verbose)
2182 .to_string(),
2183 ));
2184 }
2185 if !config.show_schema {
2186 stringified_plans.push(StringifiedPlan::new(
2187 FinalPhysicalPlanWithSchema,
2188 displayable(input.as_ref())
2191 .set_show_schema(true)
2192 .indent(e.verbose)
2193 .to_string(),
2194 ));
2195 }
2196 }
2197 }
2198 Err(DataFusionError::Context(optimizer_name, e)) => {
2199 let plan_type = OptimizedPhysicalPlan { optimizer_name };
2200 stringified_plans
2201 .push(StringifiedPlan::new(plan_type, e.to_string()))
2202 }
2203 Err(e) => return Err(e),
2204 }
2205 }
2206 Err(err) => {
2207 stringified_plans.push(StringifiedPlan::new(
2208 PhysicalPlanError,
2209 err.strip_backtrace(),
2210 ));
2211 }
2212 }
2213 }
2214
2215 Ok(Arc::new(ExplainExec::new(
2216 Arc::clone(e.schema.inner()),
2217 stringified_plans,
2218 e.verbose,
2219 )))
2220 }
2221
2222 async fn handle_analyze(
2223 &self,
2224 a: &Analyze,
2225 session_state: &SessionState,
2226 ) -> Result<Arc<dyn ExecutionPlan>> {
2227 let input = self.create_physical_plan(&a.input, session_state).await?;
2228 let schema = Arc::clone(a.schema.inner());
2229 let show_statistics = session_state.config_options().explain.show_statistics;
2230 let analyze_level = session_state.config_options().explain.analyze_level;
2231 let metric_types = match analyze_level {
2232 ExplainAnalyzeLevel::Summary => vec![MetricType::SUMMARY],
2233 ExplainAnalyzeLevel::Dev => vec![MetricType::SUMMARY, MetricType::DEV],
2234 };
2235 Ok(Arc::new(AnalyzeExec::new(
2236 a.verbose,
2237 show_statistics,
2238 metric_types,
2239 input,
2240 schema,
2241 )))
2242 }
2243
2244 pub fn optimize_physical_plan<F>(
2247 &self,
2248 plan: Arc<dyn ExecutionPlan>,
2249 session_state: &SessionState,
2250 mut observer: F,
2251 ) -> Result<Arc<dyn ExecutionPlan>>
2252 where
2253 F: FnMut(&dyn ExecutionPlan, &dyn PhysicalOptimizerRule),
2254 {
2255 let optimizers = session_state.physical_optimizers();
2256 debug!(
2257 "Input physical plan:\n{}\n",
2258 displayable(plan.as_ref()).indent(false)
2259 );
2260 debug!(
2261 "Detailed input physical plan:\n{}",
2262 displayable(plan.as_ref()).indent(true)
2263 );
2264
2265 InvariantChecker(InvariantLevel::Always).check(&plan)?;
2268
2269 let mut new_plan = Arc::clone(&plan);
2270 for optimizer in optimizers {
2271 let before_schema = new_plan.schema();
2272 new_plan = optimizer
2273 .optimize(new_plan, session_state.config_options())
2274 .map_err(|e| {
2275 DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
2276 })?;
2277
2278 OptimizationInvariantChecker::new(optimizer)
2280 .check(&new_plan, before_schema)?;
2281
2282 debug!(
2283 "Optimized physical plan by {}:\n{}\n",
2284 optimizer.name(),
2285 displayable(new_plan.as_ref()).indent(false)
2286 );
2287 observer(new_plan.as_ref(), optimizer.as_ref())
2288 }
2289
2290 InvariantChecker(InvariantLevel::Executable).check(&new_plan)?;
2293
2294 debug!(
2295 "Optimized physical plan:\n{}\n",
2296 displayable(new_plan.as_ref()).indent(false)
2297 );
2298
2299 debug!(
2304 "Detailed optimized physical plan:\n{}\n",
2305 displayable(new_plan.as_ref()).indent(true)
2306 );
2307 Ok(new_plan)
2308 }
2309
2310 fn plan_describe(
2312 &self,
2313 table_schema: Arc<Schema>,
2314 output_schema: Arc<Schema>,
2315 ) -> Result<Arc<dyn ExecutionPlan>> {
2316 let mut column_names = StringBuilder::new();
2317 let mut data_types = StringBuilder::new();
2318 let mut is_nullables = StringBuilder::new();
2319 for field in table_schema.fields() {
2320 column_names.append_value(field.name());
2321
2322 let data_type = field.data_type();
2324 data_types.append_value(format!("{data_type}"));
2325
2326 let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
2328 is_nullables.append_value(nullable_str);
2329 }
2330
2331 let record_batch = RecordBatch::try_new(
2332 output_schema,
2333 vec![
2334 Arc::new(column_names.finish()),
2335 Arc::new(data_types.finish()),
2336 Arc::new(is_nullables.finish()),
2337 ],
2338 )?;
2339
2340 let schema = record_batch.schema();
2341 let partitions = vec![vec![record_batch]];
2342 let projection = None;
2343 let mem_exec = MemorySourceConfig::try_new_exec(&partitions, schema, projection)?;
2344 Ok(mem_exec)
2345 }
2346
2347 fn create_project_physical_exec(
2348 &self,
2349 session_state: &SessionState,
2350 input_exec: Arc<dyn ExecutionPlan>,
2351 input: &Arc<LogicalPlan>,
2352 expr: &[Expr],
2353 ) -> Result<Arc<dyn ExecutionPlan>> {
2354 let input_logical_schema = input.as_ref().schema();
2355 let input_physical_schema = input_exec.schema();
2356 let physical_exprs = expr
2357 .iter()
2358 .map(|e| {
2359 let physical_name = if let Expr::Column(col) = e {
2374 match input_logical_schema.index_of_column(col) {
2375 Ok(idx) => {
2376 Ok(input_exec.schema().field(idx).name().to_string())
2378 }
2379 Err(_) => physical_name(e),
2382 }
2383 } else {
2384 physical_name(e)
2385 };
2386
2387 let physical_expr =
2388 self.create_physical_expr(e, input_logical_schema, session_state);
2389
2390 tuple_err((physical_expr, physical_name))
2391 })
2392 .collect::<Result<Vec<_>>>()?;
2393
2394 let num_input_columns = input_exec.schema().fields().len();
2395
2396 match self.try_plan_async_exprs(
2397 num_input_columns,
2398 PlannedExprResult::ExprWithName(physical_exprs),
2399 input_physical_schema.as_ref(),
2400 )? {
2401 PlanAsyncExpr::Sync(PlannedExprResult::ExprWithName(physical_exprs)) => {
2402 let proj_exprs: Vec<ProjectionExpr> = physical_exprs
2403 .into_iter()
2404 .map(|(expr, alias)| ProjectionExpr { expr, alias })
2405 .collect();
2406 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input_exec)?))
2407 }
2408 PlanAsyncExpr::Async(
2409 async_map,
2410 PlannedExprResult::ExprWithName(physical_exprs),
2411 ) => {
2412 let async_exec =
2413 AsyncFuncExec::try_new(async_map.async_exprs, input_exec)?;
2414 let proj_exprs: Vec<ProjectionExpr> = physical_exprs
2415 .into_iter()
2416 .map(|(expr, alias)| ProjectionExpr { expr, alias })
2417 .collect();
2418 let new_proj_exec =
2419 ProjectionExec::try_new(proj_exprs, Arc::new(async_exec))?;
2420 Ok(Arc::new(new_proj_exec))
2421 }
2422 _ => internal_err!("Unexpected PlanAsyncExpressions variant"),
2423 }
2424 }
2425
2426 fn try_plan_async_exprs(
2427 &self,
2428 num_input_columns: usize,
2429 physical_expr: PlannedExprResult,
2430 schema: &Schema,
2431 ) -> Result<PlanAsyncExpr> {
2432 let mut async_map = AsyncMapper::new(num_input_columns);
2433 match &physical_expr {
2434 PlannedExprResult::ExprWithName(exprs) => {
2435 exprs
2436 .iter()
2437 .try_for_each(|(expr, _)| async_map.find_references(expr, schema))?;
2438 }
2439 PlannedExprResult::Expr(exprs) => {
2440 exprs
2441 .iter()
2442 .try_for_each(|expr| async_map.find_references(expr, schema))?;
2443 }
2444 }
2445
2446 if async_map.is_empty() {
2447 return Ok(PlanAsyncExpr::Sync(physical_expr));
2448 }
2449
2450 let new_exprs = match physical_expr {
2451 PlannedExprResult::ExprWithName(exprs) => PlannedExprResult::ExprWithName(
2452 exprs
2453 .iter()
2454 .map(|(expr, column_name)| {
2455 let new_expr = Arc::clone(expr)
2456 .transform_up(|e| Ok(async_map.map_expr(e)))?;
2457 Ok((new_expr.data, column_name.to_string()))
2458 })
2459 .collect::<Result<_>>()?,
2460 ),
2461 PlannedExprResult::Expr(exprs) => PlannedExprResult::Expr(
2462 exprs
2463 .iter()
2464 .map(|expr| {
2465 let new_expr = Arc::clone(expr)
2466 .transform_up(|e| Ok(async_map.map_expr(e)))?;
2467 Ok(new_expr.data)
2468 })
2469 .collect::<Result<_>>()?,
2470 ),
2471 };
2472 Ok(PlanAsyncExpr::Async(async_map, new_exprs))
2474 }
2475}
2476
2477#[derive(Debug)]
2478enum PlannedExprResult {
2479 ExprWithName(Vec<(Arc<dyn PhysicalExpr>, String)>),
2480 Expr(Vec<Arc<dyn PhysicalExpr>>),
2481}
2482
2483#[derive(Debug)]
2484enum PlanAsyncExpr {
2485 Sync(PlannedExprResult),
2486 Async(AsyncMapper, PlannedExprResult),
2487}
2488
2489fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
2490 match value {
2491 (Ok(e), Ok(e1)) => Ok((e, e1)),
2492 (Err(e), Ok(_)) => Err(e),
2493 (Ok(_), Err(e1)) => Err(e1),
2494 (Err(e), Err(_)) => Err(e),
2495 }
2496}
2497
2498struct OptimizationInvariantChecker<'a> {
2499 rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
2500}
2501
2502impl<'a> OptimizationInvariantChecker<'a> {
2503 pub fn new(rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>) -> Self {
2505 Self { rule }
2506 }
2507
2508 pub fn check(
2514 &mut self,
2515 plan: &Arc<dyn ExecutionPlan>,
2516 previous_schema: Arc<Schema>,
2517 ) -> Result<()> {
2518 if self.rule.schema_check() && plan.schema() != previous_schema {
2520 internal_err!("PhysicalOptimizer rule '{}' failed. Schema mismatch. Expected original schema: {:?}, got new schema: {:?}",
2521 self.rule.name(),
2522 previous_schema,
2523 plan.schema()
2524 )?
2525 }
2526
2527 #[cfg(debug_assertions)]
2529 plan.visit(self)?;
2530
2531 Ok(())
2532 }
2533}
2534
2535impl<'n> TreeNodeVisitor<'n> for OptimizationInvariantChecker<'_> {
2536 type Node = Arc<dyn ExecutionPlan>;
2537
2538 fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
2539 node.check_invariants(InvariantLevel::Always).map_err(|e|
2542 e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name()))
2543 )?;
2544 Ok(TreeNodeRecursion::Continue)
2545 }
2546}
2547
2548struct InvariantChecker(InvariantLevel);
2550
2551impl InvariantChecker {
2552 pub fn check(&mut self, plan: &Arc<dyn ExecutionPlan>) -> Result<()> {
2554 plan.visit(self)?;
2556
2557 Ok(())
2558 }
2559}
2560
2561impl<'n> TreeNodeVisitor<'n> for InvariantChecker {
2562 type Node = Arc<dyn ExecutionPlan>;
2563
2564 fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
2565 node.check_invariants(self.0).map_err(|e| {
2566 e.context(format!(
2567 "Invariant for ExecutionPlan node '{}' failed",
2568 node.name()
2569 ))
2570 })?;
2571 Ok(TreeNodeRecursion::Continue)
2572 }
2573}
2574
2575#[cfg(test)]
2576mod tests {
2577 use std::any::Any;
2578 use std::cmp::Ordering;
2579 use std::fmt::{self, Debug};
2580 use std::ops::{BitAnd, Not};
2581
2582 use super::*;
2583 use crate::datasource::file_format::options::CsvReadOptions;
2584 use crate::datasource::MemTable;
2585 use crate::physical_plan::{
2586 expressions, DisplayAs, DisplayFormatType, PlanProperties,
2587 SendableRecordBatchStream,
2588 };
2589 use crate::prelude::{SessionConfig, SessionContext};
2590 use crate::test_util::{scan_empty, scan_empty_with_partitions};
2591
2592 use crate::execution::session_state::SessionStateBuilder;
2593 use arrow::array::{ArrayRef, DictionaryArray, Int32Array};
2594 use arrow::datatypes::{DataType, Field, Int32Type};
2595 use arrow_schema::SchemaRef;
2596 use datafusion_common::config::ConfigOptions;
2597 use datafusion_common::{
2598 assert_contains, DFSchemaRef, TableReference, ToDFSchema as _,
2599 };
2600 use datafusion_execution::runtime_env::RuntimeEnv;
2601 use datafusion_execution::TaskContext;
2602 use datafusion_expr::builder::subquery_alias;
2603 use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore};
2604 use datafusion_functions_aggregate::count::count_all;
2605 use datafusion_functions_aggregate::expr_fn::sum;
2606 use datafusion_physical_expr::EquivalenceProperties;
2607 use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
2608
2609 fn make_session_state() -> SessionState {
2610 let runtime = Arc::new(RuntimeEnv::default());
2611 let config = SessionConfig::new().with_target_partitions(4);
2612 let config = config.set_bool("datafusion.optimizer.skip_failed_rules", false);
2613 SessionStateBuilder::new()
2614 .with_config(config)
2615 .with_runtime_env(runtime)
2616 .with_default_features()
2617 .build()
2618 }
2619
2620 async fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
2621 let session_state = make_session_state();
2622 let logical_plan = session_state.optimize(logical_plan)?;
2624 let planner = DefaultPhysicalPlanner::default();
2625 planner
2626 .create_physical_plan(&logical_plan, &session_state)
2627 .await
2628 }
2629
2630 #[tokio::test]
2631 async fn test_all_operators() -> Result<()> {
2632 let logical_plan = test_csv_scan()
2633 .await?
2634 .filter(col("c7").lt(lit(5_u8)))?
2636 .project(vec![col("c1"), col("c2")])?
2637 .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
2638 .sort(vec![col("c1").sort(true, true)])?
2639 .limit(3, Some(10))?
2640 .build()?;
2641
2642 let exec_plan = plan(&logical_plan).await?;
2643
2644 let expected = r#"BinaryExpr { left: Column { name: "c7", index: 2 }, op: Lt, right: Literal { value: Int64(5), field: Field { name: "lit", data_type: Int64 } }, fail_on_overflow: false"#;
2648
2649 assert_contains!(format!("{exec_plan:?}"), expected);
2650 Ok(())
2651 }
2652
2653 #[tokio::test]
2654 async fn test_create_cube_expr() -> Result<()> {
2655 let logical_plan = test_csv_scan().await?.build()?;
2656
2657 let plan = plan(&logical_plan).await?;
2658
2659 let exprs = vec![col("c1"), col("c2"), col("c3")];
2660
2661 let physical_input_schema = plan.schema();
2662 let physical_input_schema = physical_input_schema.as_ref();
2663 let logical_input_schema = logical_plan.schema();
2664 let session_state = make_session_state();
2665
2666 let cube = create_cube_physical_expr(
2667 &exprs,
2668 logical_input_schema,
2669 physical_input_schema,
2670 &session_state,
2671 );
2672
2673 insta::assert_debug_snapshot!(cube, @r#"
2674 Ok(
2675 PhysicalGroupBy {
2676 expr: [
2677 (
2678 Column {
2679 name: "c1",
2680 index: 0,
2681 },
2682 "c1",
2683 ),
2684 (
2685 Column {
2686 name: "c2",
2687 index: 1,
2688 },
2689 "c2",
2690 ),
2691 (
2692 Column {
2693 name: "c3",
2694 index: 2,
2695 },
2696 "c3",
2697 ),
2698 ],
2699 null_expr: [
2700 (
2701 Literal {
2702 value: Utf8(NULL),
2703 field: Field {
2704 name: "lit",
2705 data_type: Utf8,
2706 nullable: true,
2707 },
2708 },
2709 "c1",
2710 ),
2711 (
2712 Literal {
2713 value: Int64(NULL),
2714 field: Field {
2715 name: "lit",
2716 data_type: Int64,
2717 nullable: true,
2718 },
2719 },
2720 "c2",
2721 ),
2722 (
2723 Literal {
2724 value: Int64(NULL),
2725 field: Field {
2726 name: "lit",
2727 data_type: Int64,
2728 nullable: true,
2729 },
2730 },
2731 "c3",
2732 ),
2733 ],
2734 groups: [
2735 [
2736 false,
2737 false,
2738 false,
2739 ],
2740 [
2741 true,
2742 false,
2743 false,
2744 ],
2745 [
2746 false,
2747 true,
2748 false,
2749 ],
2750 [
2751 false,
2752 false,
2753 true,
2754 ],
2755 [
2756 true,
2757 true,
2758 false,
2759 ],
2760 [
2761 true,
2762 false,
2763 true,
2764 ],
2765 [
2766 false,
2767 true,
2768 true,
2769 ],
2770 [
2771 true,
2772 true,
2773 true,
2774 ],
2775 ],
2776 },
2777 )
2778 "#);
2779
2780 Ok(())
2781 }
2782
2783 #[tokio::test]
2784 async fn test_create_rollup_expr() -> Result<()> {
2785 let logical_plan = test_csv_scan().await?.build()?;
2786
2787 let plan = plan(&logical_plan).await?;
2788
2789 let exprs = vec![col("c1"), col("c2"), col("c3")];
2790
2791 let physical_input_schema = plan.schema();
2792 let physical_input_schema = physical_input_schema.as_ref();
2793 let logical_input_schema = logical_plan.schema();
2794 let session_state = make_session_state();
2795
2796 let rollup = create_rollup_physical_expr(
2797 &exprs,
2798 logical_input_schema,
2799 physical_input_schema,
2800 &session_state,
2801 );
2802
2803 insta::assert_debug_snapshot!(rollup, @r#"
2804 Ok(
2805 PhysicalGroupBy {
2806 expr: [
2807 (
2808 Column {
2809 name: "c1",
2810 index: 0,
2811 },
2812 "c1",
2813 ),
2814 (
2815 Column {
2816 name: "c2",
2817 index: 1,
2818 },
2819 "c2",
2820 ),
2821 (
2822 Column {
2823 name: "c3",
2824 index: 2,
2825 },
2826 "c3",
2827 ),
2828 ],
2829 null_expr: [
2830 (
2831 Literal {
2832 value: Utf8(NULL),
2833 field: Field {
2834 name: "lit",
2835 data_type: Utf8,
2836 nullable: true,
2837 },
2838 },
2839 "c1",
2840 ),
2841 (
2842 Literal {
2843 value: Int64(NULL),
2844 field: Field {
2845 name: "lit",
2846 data_type: Int64,
2847 nullable: true,
2848 },
2849 },
2850 "c2",
2851 ),
2852 (
2853 Literal {
2854 value: Int64(NULL),
2855 field: Field {
2856 name: "lit",
2857 data_type: Int64,
2858 nullable: true,
2859 },
2860 },
2861 "c3",
2862 ),
2863 ],
2864 groups: [
2865 [
2866 true,
2867 true,
2868 true,
2869 ],
2870 [
2871 false,
2872 true,
2873 true,
2874 ],
2875 [
2876 false,
2877 false,
2878 true,
2879 ],
2880 [
2881 false,
2882 false,
2883 false,
2884 ],
2885 ],
2886 },
2887 )
2888 "#);
2889
2890 Ok(())
2891 }
2892
2893 #[tokio::test]
2894 async fn test_create_not() -> Result<()> {
2895 let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
2896 let dfschema = DFSchema::try_from(schema.clone())?;
2897
2898 let planner = DefaultPhysicalPlanner::default();
2899
2900 let expr = planner.create_physical_expr(
2901 &col("a").not(),
2902 &dfschema,
2903 &make_session_state(),
2904 )?;
2905 let expected = expressions::not(expressions::col("a", &schema)?)?;
2906
2907 assert_eq!(format!("{expr:?}"), format!("{expected:?}"));
2908
2909 Ok(())
2910 }
2911
2912 #[tokio::test]
2913 async fn test_with_csv_plan() -> Result<()> {
2914 let logical_plan = test_csv_scan()
2915 .await?
2916 .filter(col("c7").lt(col("c12")))?
2917 .limit(3, None)?
2918 .build()?;
2919
2920 let plan = plan(&logical_plan).await?;
2921
2922 let _expected = "predicate: BinaryExpr { left: TryCastExpr { expr: Column { name: \"c7\", index: 6 }, cast_type: Float64 }, op: Lt, right: Column { name: \"c12\", index: 11 } }";
2925 let plan_debug_str = format!("{plan:?}");
2926 assert!(plan_debug_str.contains("GlobalLimitExec"));
2927 assert!(plan_debug_str.contains("skip: 3"));
2928 Ok(())
2929 }
2930
2931 #[tokio::test]
2932 async fn error_during_extension_planning() {
2933 let session_state = make_session_state();
2934 let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
2935 ErrorExtensionPlanner {},
2936 )]);
2937
2938 let logical_plan = LogicalPlan::Extension(Extension {
2939 node: Arc::new(NoOpExtensionNode::default()),
2940 });
2941 match planner
2942 .create_physical_plan(&logical_plan, &session_state)
2943 .await
2944 {
2945 Ok(_) => panic!("Expected planning failure"),
2946 Err(e) => assert!(e.to_string().contains("BOOM"),),
2947 }
2948 }
2949
2950 #[tokio::test]
2951 async fn test_with_zero_offset_plan() -> Result<()> {
2952 let logical_plan = test_csv_scan().await?.limit(0, None)?.build()?;
2953 let plan = plan(&logical_plan).await?;
2954 assert!(!format!("{plan:?}").contains("limit="));
2955 Ok(())
2956 }
2957
2958 #[tokio::test]
2959 async fn test_limit_with_partitions() -> Result<()> {
2960 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
2961
2962 let logical_plan = scan_empty_with_partitions(Some("test"), &schema, None, 2)?
2963 .limit(3, Some(5))?
2964 .build()?;
2965 let plan = plan(&logical_plan).await?;
2966
2967 assert!(format!("{plan:?}").contains("GlobalLimitExec"));
2968 assert!(format!("{plan:?}").contains("skip: 3, fetch: Some(5)"));
2969
2970 Ok(())
2971 }
2972
2973 #[tokio::test]
2974 async fn errors() -> Result<()> {
2975 let bool_expr = col("c1").eq(col("c1"));
2976 let cases = vec![
2977 col("c1").eq(col("c1")),
2979 col("c3").bitand(col("c3")),
2981 col("c1").eq(col("c3")),
2983 bool_expr.clone().and(bool_expr),
2985 ];
2986 for case in cases {
2987 test_csv_scan().await?.project(vec![case.clone()]).unwrap();
2988 }
2989 Ok(())
2990 }
2991
2992 #[tokio::test]
2993 async fn default_extension_planner() {
2994 let session_state = make_session_state();
2995 let planner = DefaultPhysicalPlanner::default();
2996 let logical_plan = LogicalPlan::Extension(Extension {
2997 node: Arc::new(NoOpExtensionNode::default()),
2998 });
2999 let plan = planner
3000 .create_physical_plan(&logical_plan, &session_state)
3001 .await;
3002
3003 let expected_error =
3004 "No installed planner was able to convert the custom node to an execution plan: NoOp";
3005 match plan {
3006 Ok(_) => panic!("Expected planning failure"),
3007 Err(e) => assert!(
3008 e.to_string().contains(expected_error),
3009 "Error '{e}' did not contain expected error '{expected_error}'"
3010 ),
3011 }
3012 }
3013
3014 #[tokio::test]
3015 async fn bad_extension_planner() {
3016 let session_state = make_session_state();
3019 let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
3020 BadExtensionPlanner {},
3021 )]);
3022
3023 let logical_plan = LogicalPlan::Extension(Extension {
3024 node: Arc::new(NoOpExtensionNode::default()),
3025 });
3026 let e = planner
3027 .create_physical_plan(&logical_plan, &session_state)
3028 .await
3029 .expect_err("planning error")
3030 .strip_backtrace();
3031
3032 insta::assert_snapshot!(e, @r#"Error during planning: Extension planner for NoOp created an ExecutionPlan with mismatched schema. LogicalPlan schema: DFSchema { inner: Schema { fields: [Field { name: "a", data_type: Int32 }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] } }, ExecutionPlan schema: Schema { fields: [Field { name: "b", data_type: Int32 }], metadata: {} }"#);
3033 }
3034
3035 #[tokio::test]
3036 async fn in_list_types() -> Result<()> {
3037 let list = vec![lit("a"), lit(1i64)];
3039 let logical_plan = test_csv_scan()
3040 .await?
3041 .filter(col("c12").lt(lit(0.05)))?
3043 .project(vec![col("c1").in_list(list, false)])?
3044 .build()?;
3045 let execution_plan = plan(&logical_plan).await?;
3046 let expected = r#"expr: BinaryExpr { left: BinaryExpr { left: Column { name: "c1", index: 0 }, op: Eq, right: Literal { value: Utf8("a"), field: Field { name: "lit", data_type: Utf8 } }, fail_on_overflow: false }"#;
3049
3050 assert_contains!(format!("{execution_plan:?}"), expected);
3051
3052 Ok(())
3053 }
3054
3055 #[tokio::test]
3056 async fn in_list_types_struct_literal() -> Result<()> {
3057 let list = vec![struct_literal(), lit("a")];
3059
3060 let logical_plan = test_csv_scan()
3061 .await?
3062 .filter(col("c12").lt(lit(0.05)))?
3064 .project(vec![col("c12").lt_eq(lit(0.025)).in_list(list, false)])?
3065 .build()?;
3066 let e = plan(&logical_plan).await.unwrap_err().to_string();
3067
3068 assert_contains!(
3069 &e,
3070 r#"Error during planning: Can not find compatible types to compare Boolean with [Struct("foo": Boolean), Utf8]"#
3071 );
3072
3073 Ok(())
3074 }
3075
3076 fn struct_literal() -> Expr {
3078 let struct_literal = ScalarValue::try_from(DataType::Struct(
3079 vec![Field::new("foo", DataType::Boolean, false)].into(),
3080 ))
3081 .unwrap();
3082
3083 lit(struct_literal)
3084 }
3085
3086 #[tokio::test]
3087 async fn hash_agg_input_schema() -> Result<()> {
3088 let logical_plan = test_csv_scan_with_name("aggregate_test_100")
3089 .await?
3090 .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
3091 .build()?;
3092
3093 let execution_plan = plan(&logical_plan).await?;
3094 let final_hash_agg = execution_plan
3095 .as_any()
3096 .downcast_ref::<AggregateExec>()
3097 .expect("hash aggregate");
3098 assert_eq!(
3099 "sum(aggregate_test_100.c2)",
3100 final_hash_agg.schema().field(1).name()
3101 );
3102 assert_eq!("c2", final_hash_agg.input_schema().field(1).name());
3105
3106 Ok(())
3107 }
3108
3109 #[tokio::test]
3110 async fn hash_agg_grouping_set_input_schema() -> Result<()> {
3111 let grouping_set_expr = Expr::GroupingSet(GroupingSet::GroupingSets(vec![
3112 vec![col("c1")],
3113 vec![col("c2")],
3114 vec![col("c1"), col("c2")],
3115 ]));
3116 let logical_plan = test_csv_scan_with_name("aggregate_test_100")
3117 .await?
3118 .aggregate(vec![grouping_set_expr], vec![sum(col("c3"))])?
3119 .build()?;
3120
3121 let execution_plan = plan(&logical_plan).await?;
3122 let final_hash_agg = execution_plan
3123 .as_any()
3124 .downcast_ref::<AggregateExec>()
3125 .expect("hash aggregate");
3126 assert_eq!(
3127 "sum(aggregate_test_100.c3)",
3128 final_hash_agg.schema().field(3).name()
3129 );
3130 assert_eq!("c3", final_hash_agg.input_schema().field(2).name());
3133
3134 Ok(())
3135 }
3136
3137 #[tokio::test]
3138 async fn hash_agg_group_by_partitioned() -> Result<()> {
3139 let logical_plan = test_csv_scan()
3140 .await?
3141 .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
3142 .build()?;
3143
3144 let execution_plan = plan(&logical_plan).await?;
3145 let formatted = format!("{execution_plan:?}");
3146
3147 assert!(formatted.contains("FinalPartitioned"));
3150
3151 Ok(())
3152 }
3153
3154 #[tokio::test]
3155 async fn hash_agg_group_by_partitioned_on_dicts() -> Result<()> {
3156 let dict_array: DictionaryArray<Int32Type> =
3157 vec!["A", "B", "A", "A", "C", "A"].into_iter().collect();
3158 let val_array: Int32Array = vec![1, 2, 2, 4, 1, 1].into();
3159
3160 let batch = RecordBatch::try_from_iter(vec![
3161 ("d1", Arc::new(dict_array) as ArrayRef),
3162 ("d2", Arc::new(val_array) as ArrayRef),
3163 ])
3164 .unwrap();
3165
3166 let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
3167 let ctx = SessionContext::new();
3168
3169 let logical_plan = LogicalPlanBuilder::from(
3170 ctx.read_table(Arc::new(table))?.into_optimized_plan()?,
3171 )
3172 .aggregate(vec![col("d1")], vec![sum(col("d2"))])?
3173 .build()?;
3174
3175 let execution_plan = plan(&logical_plan).await?;
3176 let formatted = format!("{execution_plan:?}");
3177
3178 assert!(formatted.contains("FinalPartitioned"));
3181 Ok(())
3182 }
3183
3184 #[tokio::test]
3185 async fn hash_agg_grouping_set_by_partitioned() -> Result<()> {
3186 let grouping_set_expr = Expr::GroupingSet(GroupingSet::GroupingSets(vec![
3187 vec![col("c1")],
3188 vec![col("c2")],
3189 vec![col("c1"), col("c2")],
3190 ]));
3191 let logical_plan = test_csv_scan()
3192 .await?
3193 .aggregate(vec![grouping_set_expr], vec![sum(col("c3"))])?
3194 .build()?;
3195
3196 let execution_plan = plan(&logical_plan).await?;
3197 let formatted = format!("{execution_plan:?}");
3198
3199 assert!(formatted.contains("FinalPartitioned"));
3202
3203 Ok(())
3204 }
3205
3206 #[tokio::test]
3207 async fn aggregate_with_alias() -> Result<()> {
3208 let schema = Arc::new(Schema::new(vec![
3209 Field::new("c1", DataType::Utf8, false),
3210 Field::new("c2", DataType::UInt32, false),
3211 ]));
3212
3213 let logical_plan = scan_empty(None, schema.as_ref(), None)?
3214 .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
3215 .project(vec![col("c1"), sum(col("c2")).alias("total_salary")])?
3216 .build()?;
3217
3218 let physical_plan = plan(&logical_plan).await?;
3219 assert_eq!("c1", physical_plan.schema().field(0).name().as_str());
3220 assert_eq!(
3221 "total_salary",
3222 physical_plan.schema().field(1).name().as_str()
3223 );
3224 Ok(())
3225 }
3226
3227 #[tokio::test]
3228 async fn test_aggregate_count_all_with_alias() -> Result<()> {
3229 let schema = Arc::new(Schema::new(vec![
3230 Field::new("c1", DataType::Utf8, false),
3231 Field::new("c2", DataType::UInt32, false),
3232 ]));
3233
3234 let logical_plan = scan_empty(None, schema.as_ref(), None)?
3235 .aggregate(Vec::<Expr>::new(), vec![count_all().alias("total_rows")])?
3236 .build()?;
3237
3238 let physical_plan = plan(&logical_plan).await?;
3239 assert_eq!(
3240 "total_rows",
3241 physical_plan.schema().field(0).name().as_str()
3242 );
3243 Ok(())
3244 }
3245
3246 #[tokio::test]
3247 async fn test_explain() {
3248 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
3249
3250 let logical_plan = scan_empty(Some("employee"), &schema, None)
3251 .unwrap()
3252 .explain(true, false)
3253 .unwrap()
3254 .build()
3255 .unwrap();
3256
3257 let plan = plan(&logical_plan).await.unwrap();
3258 if let Some(plan) = plan.as_any().downcast_ref::<ExplainExec>() {
3259 let stringified_plans = plan.stringified_plans();
3260 assert!(stringified_plans.len() >= 4);
3261 assert!(stringified_plans
3262 .iter()
3263 .any(|p| matches!(p.plan_type, PlanType::FinalLogicalPlan)));
3264 assert!(stringified_plans
3265 .iter()
3266 .any(|p| matches!(p.plan_type, PlanType::InitialPhysicalPlan)));
3267 assert!(stringified_plans
3268 .iter()
3269 .any(|p| matches!(p.plan_type, PlanType::OptimizedPhysicalPlan { .. })));
3270 assert!(stringified_plans
3271 .iter()
3272 .any(|p| matches!(p.plan_type, PlanType::FinalPhysicalPlan)));
3273 } else {
3274 panic!(
3275 "Plan was not an explain plan: {}",
3276 displayable(plan.as_ref()).indent(true)
3277 );
3278 }
3279 }
3280
3281 #[tokio::test]
3282 async fn test_explain_indent_err() {
3283 let planner = DefaultPhysicalPlanner::default();
3284 let ctx = SessionContext::new();
3285 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
3286 let plan = Arc::new(
3287 scan_empty(Some("employee"), &schema, None)
3288 .unwrap()
3289 .explain(true, false)
3290 .unwrap()
3291 .build()
3292 .unwrap(),
3293 );
3294
3295 let schema = Arc::new(Schema::new(vec![
3297 Field::new("plan_type", DataType::Utf8, false),
3298 Field::new("plan", DataType::Utf8, false),
3299 ]));
3300
3301 let stringified_plans =
3303 vec![StringifiedPlan::new(PlanType::FinalLogicalPlan, "Test Err")];
3304
3305 let explain = Explain {
3306 verbose: false,
3307 explain_format: ExplainFormat::Indent,
3308 plan,
3309 stringified_plans,
3310 schema: schema.to_dfschema_ref().unwrap(),
3311 logical_optimization_succeeded: false,
3312 };
3313 let plan = planner
3314 .handle_explain(&explain, &ctx.state())
3315 .await
3316 .unwrap();
3317 if let Some(plan) = plan.as_any().downcast_ref::<ExplainExec>() {
3318 let stringified_plans = plan.stringified_plans();
3319 assert_eq!(stringified_plans.len(), 1);
3320 assert_eq!(stringified_plans[0].plan.as_str(), "Test Err");
3321 } else {
3322 panic!(
3323 "Plan was not an explain plan: {}",
3324 displayable(plan.as_ref()).indent(true)
3325 );
3326 }
3327 }
3328
3329 struct ErrorExtensionPlanner {}
3330
3331 #[async_trait]
3332 impl ExtensionPlanner for ErrorExtensionPlanner {
3333 async fn plan_extension(
3335 &self,
3336 _planner: &dyn PhysicalPlanner,
3337 _node: &dyn UserDefinedLogicalNode,
3338 _logical_inputs: &[&LogicalPlan],
3339 _physical_inputs: &[Arc<dyn ExecutionPlan>],
3340 _session_state: &SessionState,
3341 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
3342 internal_err!("BOOM")
3343 }
3344 }
3345 #[derive(PartialEq, Eq, Hash)]
3347 struct NoOpExtensionNode {
3348 schema: DFSchemaRef,
3349 }
3350
3351 impl Default for NoOpExtensionNode {
3352 fn default() -> Self {
3353 Self {
3354 schema: DFSchemaRef::new(
3355 DFSchema::from_unqualified_fields(
3356 vec![Field::new("a", DataType::Int32, false)].into(),
3357 HashMap::new(),
3358 )
3359 .unwrap(),
3360 ),
3361 }
3362 }
3363 }
3364
3365 impl Debug for NoOpExtensionNode {
3366 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3367 write!(f, "NoOp")
3368 }
3369 }
3370
3371 impl PartialOrd for NoOpExtensionNode {
3374 fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3375 None
3376 }
3377 }
3378
3379 impl UserDefinedLogicalNodeCore for NoOpExtensionNode {
3380 fn name(&self) -> &str {
3381 "NoOp"
3382 }
3383
3384 fn inputs(&self) -> Vec<&LogicalPlan> {
3385 vec![]
3386 }
3387
3388 fn schema(&self) -> &DFSchemaRef {
3389 &self.schema
3390 }
3391
3392 fn expressions(&self) -> Vec<Expr> {
3393 vec![]
3394 }
3395
3396 fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
3397 write!(f, "NoOp")
3398 }
3399
3400 fn with_exprs_and_inputs(
3401 &self,
3402 _exprs: Vec<Expr>,
3403 _inputs: Vec<LogicalPlan>,
3404 ) -> Result<Self> {
3405 unimplemented!("NoOp");
3406 }
3407
3408 fn supports_limit_pushdown(&self) -> bool {
3409 false }
3411 }
3412
3413 #[derive(Debug)]
3414 struct NoOpExecutionPlan {
3415 cache: PlanProperties,
3416 }
3417
3418 impl NoOpExecutionPlan {
3419 fn new(schema: SchemaRef) -> Self {
3420 let cache = Self::compute_properties(schema);
3421 Self { cache }
3422 }
3423
3424 fn compute_properties(schema: SchemaRef) -> PlanProperties {
3426 PlanProperties::new(
3427 EquivalenceProperties::new(schema),
3428 Partitioning::UnknownPartitioning(1),
3429 EmissionType::Incremental,
3430 Boundedness::Bounded,
3431 )
3432 }
3433 }
3434
3435 impl DisplayAs for NoOpExecutionPlan {
3436 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
3437 match t {
3438 DisplayFormatType::Default | DisplayFormatType::Verbose => {
3439 write!(f, "NoOpExecutionPlan")
3440 }
3441 DisplayFormatType::TreeRender => {
3442 write!(f, "")
3444 }
3445 }
3446 }
3447 }
3448
3449 impl ExecutionPlan for NoOpExecutionPlan {
3450 fn name(&self) -> &'static str {
3451 "NoOpExecutionPlan"
3452 }
3453
3454 fn as_any(&self) -> &dyn Any {
3456 self
3457 }
3458
3459 fn properties(&self) -> &PlanProperties {
3460 &self.cache
3461 }
3462
3463 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3464 vec![]
3465 }
3466
3467 fn with_new_children(
3468 self: Arc<Self>,
3469 _children: Vec<Arc<dyn ExecutionPlan>>,
3470 ) -> Result<Arc<dyn ExecutionPlan>> {
3471 unimplemented!("NoOpExecutionPlan::with_new_children");
3472 }
3473
3474 fn execute(
3475 &self,
3476 _partition: usize,
3477 _context: Arc<TaskContext>,
3478 ) -> Result<SendableRecordBatchStream> {
3479 unimplemented!("NoOpExecutionPlan::execute");
3480 }
3481 }
3482
3483 struct BadExtensionPlanner {}
3486
3487 #[async_trait]
3488 impl ExtensionPlanner for BadExtensionPlanner {
3489 async fn plan_extension(
3491 &self,
3492 _planner: &dyn PhysicalPlanner,
3493 _node: &dyn UserDefinedLogicalNode,
3494 _logical_inputs: &[&LogicalPlan],
3495 _physical_inputs: &[Arc<dyn ExecutionPlan>],
3496 _session_state: &SessionState,
3497 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
3498 Ok(Some(Arc::new(NoOpExecutionPlan::new(SchemaRef::new(
3499 Schema::new(vec![Field::new("b", DataType::Int32, false)]),
3500 )))))
3501 }
3502 }
3503
3504 async fn test_csv_scan_with_name(name: &str) -> Result<LogicalPlanBuilder> {
3505 let ctx = SessionContext::new();
3506 let testdata = crate::test_util::arrow_test_data();
3507 let path = format!("{testdata}/csv/aggregate_test_100.csv");
3508 let options = CsvReadOptions::new().schema_infer_max_records(100);
3509 let logical_plan =
3510 match ctx.read_csv(path, options).await?.into_optimized_plan()? {
3511 LogicalPlan::TableScan(ref scan) => {
3512 let mut scan = scan.clone();
3513 let table_reference = TableReference::from(name);
3514 scan.table_name = table_reference;
3515 let new_schema = scan
3516 .projected_schema
3517 .as_ref()
3518 .clone()
3519 .replace_qualifier(name.to_string());
3520 scan.projected_schema = Arc::new(new_schema);
3521 LogicalPlan::TableScan(scan)
3522 }
3523 _ => unimplemented!(),
3524 };
3525 Ok(LogicalPlanBuilder::from(logical_plan))
3526 }
3527
3528 async fn test_csv_scan() -> Result<LogicalPlanBuilder> {
3529 let ctx = SessionContext::new();
3530 let testdata = crate::test_util::arrow_test_data();
3531 let path = format!("{testdata}/csv/aggregate_test_100.csv");
3532 let options = CsvReadOptions::new().schema_infer_max_records(100);
3533 Ok(LogicalPlanBuilder::from(
3534 ctx.read_csv(path, options).await?.into_optimized_plan()?,
3535 ))
3536 }
3537
3538 #[tokio::test]
3539 async fn test_display_plan_in_graphviz_format() {
3540 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
3541
3542 let logical_plan = scan_empty(Some("employee"), &schema, None)
3543 .unwrap()
3544 .project(vec![col("id") + lit(2)])
3545 .unwrap()
3546 .build()
3547 .unwrap();
3548
3549 let plan = plan(&logical_plan).await.unwrap();
3550
3551 let expected_graph = r#"
3552// Begin DataFusion GraphViz Plan,
3553// display it online here: https://dreampuf.github.io/GraphvizOnline
3554
3555digraph {
3556 1[shape=box label="ProjectionExec: expr=[id@0 + 2 as employee.id + Int32(2)]", tooltip=""]
3557 2[shape=box label="EmptyExec", tooltip=""]
3558 1 -> 2 [arrowhead=none, arrowtail=normal, dir=back]
3559}
3560// End DataFusion GraphViz Plan
3561"#;
3562
3563 let generated_graph = format!("{}", displayable(&*plan).graphviz());
3564
3565 assert_eq!(expected_graph, generated_graph);
3566 }
3567
3568 #[tokio::test]
3569 async fn test_display_graphviz_with_statistics() {
3570 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
3571
3572 let logical_plan = scan_empty(Some("employee"), &schema, None)
3573 .unwrap()
3574 .project(vec![col("id") + lit(2)])
3575 .unwrap()
3576 .build()
3577 .unwrap();
3578
3579 let plan = plan(&logical_plan).await.unwrap();
3580
3581 let expected_tooltip = ", tooltip=\"statistics=[";
3582
3583 let generated_graph = format!(
3584 "{}",
3585 displayable(&*plan).set_show_statistics(true).graphviz()
3586 );
3587
3588 assert_contains!(generated_graph, expected_tooltip);
3589 }
3590
3591 #[derive(Debug)]
3593 struct OkExtensionNode(Vec<Arc<dyn ExecutionPlan>>);
3594 impl ExecutionPlan for OkExtensionNode {
3595 fn name(&self) -> &str {
3596 "always ok"
3597 }
3598 fn with_new_children(
3599 self: Arc<Self>,
3600 children: Vec<Arc<dyn ExecutionPlan>>,
3601 ) -> Result<Arc<dyn ExecutionPlan>> {
3602 Ok(Arc::new(Self(children)))
3603 }
3604 fn schema(&self) -> SchemaRef {
3605 Arc::new(Schema::empty())
3606 }
3607 fn as_any(&self) -> &dyn Any {
3608 unimplemented!()
3609 }
3610 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3611 self.0.iter().collect::<Vec<_>>()
3612 }
3613 fn properties(&self) -> &PlanProperties {
3614 unimplemented!()
3615 }
3616 fn execute(
3617 &self,
3618 _partition: usize,
3619 _context: Arc<TaskContext>,
3620 ) -> Result<SendableRecordBatchStream> {
3621 unimplemented!()
3622 }
3623 }
3624 impl DisplayAs for OkExtensionNode {
3625 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
3626 write!(f, "{}", self.name())
3627 }
3628 }
3629
3630 #[derive(Debug)]
3632 struct InvariantFailsExtensionNode;
3633 impl ExecutionPlan for InvariantFailsExtensionNode {
3634 fn name(&self) -> &str {
3635 "InvariantFailsExtensionNode"
3636 }
3637 fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
3638 match check {
3639 InvariantLevel::Always => plan_err!("extension node failed it's user-defined always-invariant check"),
3640 InvariantLevel::Executable => panic!("the OptimizationInvariantChecker should not be checking for executableness"),
3641 }
3642 }
3643 fn schema(&self) -> SchemaRef {
3644 Arc::new(Schema::empty())
3645 }
3646 fn with_new_children(
3647 self: Arc<Self>,
3648 _children: Vec<Arc<dyn ExecutionPlan>>,
3649 ) -> Result<Arc<dyn ExecutionPlan>> {
3650 unimplemented!()
3651 }
3652 fn as_any(&self) -> &dyn Any {
3653 unimplemented!()
3654 }
3655 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3656 unimplemented!()
3657 }
3658 fn properties(&self) -> &PlanProperties {
3659 unimplemented!()
3660 }
3661 fn execute(
3662 &self,
3663 _partition: usize,
3664 _context: Arc<TaskContext>,
3665 ) -> Result<SendableRecordBatchStream> {
3666 unimplemented!()
3667 }
3668 }
3669 impl DisplayAs for InvariantFailsExtensionNode {
3670 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
3671 write!(f, "{}", self.name())
3672 }
3673 }
3674
3675 #[derive(Debug)]
3677 struct OptimizerRuleWithSchemaCheck;
3678 impl PhysicalOptimizerRule for OptimizerRuleWithSchemaCheck {
3679 fn optimize(
3680 &self,
3681 plan: Arc<dyn ExecutionPlan>,
3682 _config: &ConfigOptions,
3683 ) -> Result<Arc<dyn ExecutionPlan>> {
3684 Ok(plan)
3685 }
3686 fn name(&self) -> &str {
3687 "OptimizerRuleWithSchemaCheck"
3688 }
3689 fn schema_check(&self) -> bool {
3690 true
3691 }
3692 }
3693
3694 #[test]
3695 fn test_optimization_invariant_checker() -> Result<()> {
3696 let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
3697 Arc::new(OptimizerRuleWithSchemaCheck);
3698
3699 let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode(vec![]));
3701 let child = Arc::clone(&ok_node);
3702 let ok_plan = Arc::clone(&ok_node).with_new_children(vec![
3703 Arc::clone(&child).with_new_children(vec![Arc::clone(&child)])?,
3704 Arc::clone(&child),
3705 ])?;
3706
3707 let equal_schema = ok_plan.schema();
3709 OptimizationInvariantChecker::new(&rule).check(&ok_plan, equal_schema)?;
3710
3711 let different_schema =
3713 Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
3714 let expected_err = OptimizationInvariantChecker::new(&rule)
3715 .check(&ok_plan, different_schema)
3716 .unwrap_err();
3717 assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed. Schema mismatch. Expected original schema"));
3718
3719 let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
3721 let expected_err = OptimizationInvariantChecker::new(&rule)
3722 .check(&failing_node, ok_plan.schema())
3723 .unwrap_err();
3724 assert!(expected_err
3725 .to_string()
3726 .contains("extension node failed it's user-defined always-invariant check"));
3727
3728 let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
3730 let invalid_plan = ok_node.with_new_children(vec![
3731 Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
3732 Arc::clone(&child),
3733 ])?;
3734 let expected_err = OptimizationInvariantChecker::new(&rule)
3735 .check(&invalid_plan, ok_plan.schema())
3736 .unwrap_err();
3737 assert!(expected_err
3738 .to_string()
3739 .contains("extension node failed it's user-defined always-invariant check"));
3740
3741 Ok(())
3742 }
3743
3744 #[derive(Debug)]
3747 struct ExecutableInvariantFails;
3748 impl ExecutionPlan for ExecutableInvariantFails {
3749 fn name(&self) -> &str {
3750 "ExecutableInvariantFails"
3751 }
3752 fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
3753 match check {
3754 InvariantLevel::Always => Ok(()),
3755 InvariantLevel::Executable => plan_err!(
3756 "extension node failed it's user-defined executable-invariant check"
3757 ),
3758 }
3759 }
3760 fn schema(&self) -> SchemaRef {
3761 Arc::new(Schema::empty())
3762 }
3763 fn with_new_children(
3764 self: Arc<Self>,
3765 _children: Vec<Arc<dyn ExecutionPlan>>,
3766 ) -> Result<Arc<dyn ExecutionPlan>> {
3767 unimplemented!()
3768 }
3769 fn as_any(&self) -> &dyn Any {
3770 unimplemented!()
3771 }
3772 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3773 vec![]
3774 }
3775 fn properties(&self) -> &PlanProperties {
3776 unimplemented!()
3777 }
3778 fn execute(
3779 &self,
3780 _partition: usize,
3781 _context: Arc<TaskContext>,
3782 ) -> Result<SendableRecordBatchStream> {
3783 unimplemented!()
3784 }
3785 }
3786 impl DisplayAs for ExecutableInvariantFails {
3787 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
3788 write!(f, "{}", self.name())
3789 }
3790 }
3791
3792 #[test]
3793 fn test_invariant_checker_levels() -> Result<()> {
3794 let plan: Arc<dyn ExecutionPlan> = Arc::new(ExecutableInvariantFails);
3796
3797 InvariantChecker(InvariantLevel::Always).check(&plan)?;
3799
3800 let expected_err = InvariantChecker(InvariantLevel::Executable)
3802 .check(&plan)
3803 .unwrap_err();
3804 assert!(expected_err.to_string().contains(
3805 "extension node failed it's user-defined executable-invariant check"
3806 ));
3807
3808 let failing_node: Arc<dyn ExecutionPlan> = Arc::new(ExecutableInvariantFails);
3810 let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode(vec![]));
3811 let child = Arc::clone(&ok_node);
3812 let plan = ok_node.with_new_children(vec![
3813 Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
3814 Arc::clone(&child),
3815 ])?;
3816 let expected_err = InvariantChecker(InvariantLevel::Executable)
3817 .check(&plan)
3818 .unwrap_err();
3819 assert!(expected_err.to_string().contains(
3820 "extension node failed it's user-defined executable-invariant check"
3821 ));
3822
3823 Ok(())
3824 }
3825
3826 #[tokio::test]
3844 async fn subquery_alias_confusing_the_optimizer() -> Result<()> {
3845 let state = make_session_state();
3846
3847 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3848 let schema = Arc::new(schema);
3849
3850 let table = MemTable::try_new(schema.clone(), vec![vec![]])?;
3851 let table = Arc::new(table);
3852
3853 let source = DefaultTableSource::new(table);
3854 let source = Arc::new(source);
3855
3856 let left = LogicalPlanBuilder::scan("left", source.clone(), None)?;
3857 let right = LogicalPlanBuilder::scan("right", source, None)?.build()?;
3858
3859 let join_keys = (
3860 vec![datafusion_common::Column::new(Some("left"), "a")],
3861 vec![datafusion_common::Column::new(Some("right"), "a")],
3862 );
3863
3864 let join = left.join(right, JoinType::Full, join_keys, None)?.build()?;
3865
3866 let alias = subquery_alias(join, "alias")?;
3867
3868 let planner = DefaultPhysicalPlanner::default();
3869
3870 let logical_plan = LogicalPlanBuilder::new(alias)
3871 .aggregate(vec![col("a:1")], Vec::<Expr>::new())?
3872 .build()?;
3873 let _physical_plan = planner.create_physical_plan(&logical_plan, &state).await?;
3874
3875 let optimized_logical_plan = state.optimize(&logical_plan)?;
3876 let _optimized_physical_plan = planner
3877 .create_physical_plan(&optimized_logical_plan, &state)
3878 .await?;
3879
3880 Ok(())
3881 }
3882}