datafusion/
physical_planner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Planner for [`LogicalPlan`] to [`ExecutionPlan`]
19
20use std::borrow::Cow;
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use crate::datasource::file_format::file_type_to_format;
25use crate::datasource::listing::ListingTableUrl;
26use crate::datasource::physical_plan::FileSinkConfig;
27use crate::datasource::{source_as_provider, DefaultTableSource};
28use crate::error::{DataFusionError, Result};
29use crate::execution::context::{ExecutionProps, SessionState};
30use crate::logical_expr::utils::generate_sort_key;
31use crate::logical_expr::{
32    Aggregate, EmptyRelation, Join, Projection, Sort, TableScan, Unnest, Values, Window,
33};
34use crate::logical_expr::{
35    Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType, Repartition,
36    UserDefinedLogicalNode,
37};
38use crate::physical_expr::{create_physical_expr, create_physical_exprs};
39use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
40use crate::physical_plan::analyze::AnalyzeExec;
41use crate::physical_plan::explain::ExplainExec;
42use crate::physical_plan::filter::FilterExec;
43use crate::physical_plan::joins::utils as join_utils;
44use crate::physical_plan::joins::{
45    CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
46};
47use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
48use crate::physical_plan::projection::{ProjectionExec, ProjectionExpr};
49use crate::physical_plan::repartition::RepartitionExec;
50use crate::physical_plan::sorts::sort::SortExec;
51use crate::physical_plan::union::UnionExec;
52use crate::physical_plan::unnest::UnnestExec;
53use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
54use crate::physical_plan::{
55    displayable, windows, ExecutionPlan, ExecutionPlanProperties, InputOrderMode,
56    Partitioning, PhysicalExpr, WindowExpr,
57};
58use crate::schema_equivalence::schema_satisfied_by;
59
60use arrow::array::{builder::StringBuilder, RecordBatch};
61use arrow::compute::SortOptions;
62use arrow::datatypes::Schema;
63use datafusion_catalog::ScanArgs;
64use datafusion_common::display::ToStringifiedPlan;
65use datafusion_common::format::ExplainAnalyzeLevel;
66use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
67use datafusion_common::TableReference;
68use datafusion_common::{
69    exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
70    ScalarValue,
71};
72use datafusion_datasource::file_groups::FileGroup;
73use datafusion_datasource::memory::MemorySourceConfig;
74use datafusion_expr::dml::{CopyTo, InsertOp};
75use datafusion_expr::expr::{
76    physical_name, AggregateFunction, AggregateFunctionParams, Alias, GroupingSet,
77    NullTreatment, WindowFunction, WindowFunctionParams,
78};
79use datafusion_expr::expr_rewriter::unnormalize_cols;
80use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
81use datafusion_expr::utils::split_conjunction;
82use datafusion_expr::{
83    Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension,
84    FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType, StringifiedPlan,
85    WindowFrame, WindowFrameBound, WriteOp,
86};
87use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
88use datafusion_physical_expr::expressions::Literal;
89use datafusion_physical_expr::{
90    create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
91};
92use datafusion_physical_optimizer::PhysicalOptimizerRule;
93use datafusion_physical_plan::empty::EmptyExec;
94use datafusion_physical_plan::execution_plan::InvariantLevel;
95use datafusion_physical_plan::joins::PiecewiseMergeJoinExec;
96use datafusion_physical_plan::metrics::MetricType;
97use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
98use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
99use datafusion_physical_plan::unnest::ListUnnest;
100
101use async_trait::async_trait;
102use datafusion_physical_plan::async_func::{AsyncFuncExec, AsyncMapper};
103use futures::{StreamExt, TryStreamExt};
104use itertools::{multiunzip, Itertools};
105use log::debug;
106use tokio::sync::Mutex;
107
108/// Physical query planner that converts a `LogicalPlan` to an
109/// `ExecutionPlan` suitable for execution.
110#[async_trait]
111pub trait PhysicalPlanner: Send + Sync {
112    /// Create a physical plan from a logical plan
113    async fn create_physical_plan(
114        &self,
115        logical_plan: &LogicalPlan,
116        session_state: &SessionState,
117    ) -> Result<Arc<dyn ExecutionPlan>>;
118
119    /// Create a physical expression from a logical expression
120    /// suitable for evaluation
121    ///
122    /// `expr`: the expression to convert
123    ///
124    /// `input_dfschema`: the logical plan schema for evaluating `expr`
125    fn create_physical_expr(
126        &self,
127        expr: &Expr,
128        input_dfschema: &DFSchema,
129        session_state: &SessionState,
130    ) -> Result<Arc<dyn PhysicalExpr>>;
131}
132
133/// This trait exposes the ability to plan an [`ExecutionPlan`] out of a [`LogicalPlan`].
134#[async_trait]
135pub trait ExtensionPlanner {
136    /// Create a physical plan for a [`UserDefinedLogicalNode`].
137    ///
138    /// `input_dfschema`: the logical plan schema for the inputs to this node
139    ///
140    /// Returns an error when the planner knows how to plan the concrete
141    /// implementation of `node` but errors while doing so.
142    ///
143    /// Returns `None` when the planner does not know how to plan the
144    /// `node` and wants to delegate the planning to another
145    /// [`ExtensionPlanner`].
146    async fn plan_extension(
147        &self,
148        planner: &dyn PhysicalPlanner,
149        node: &dyn UserDefinedLogicalNode,
150        logical_inputs: &[&LogicalPlan],
151        physical_inputs: &[Arc<dyn ExecutionPlan>],
152        session_state: &SessionState,
153    ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
154}
155
156/// Default single node physical query planner that converts a
157/// `LogicalPlan` to an `ExecutionPlan` suitable for execution.
158///
159/// This planner will first flatten the `LogicalPlan` tree via a
160/// depth first approach, which allows it to identify the leaves
161/// of the tree.
162///
163/// Tasks are spawned from these leaves and traverse back up the
164/// tree towards the root, converting each `LogicalPlan` node it
165/// reaches into their equivalent `ExecutionPlan` node. When these
166/// tasks reach a common node, they will terminate until the last
167/// task reaches the node which will then continue building up the
168/// tree.
169///
170/// Up to [`planning_concurrency`] tasks are buffered at once to
171/// execute concurrently.
172///
173/// [`planning_concurrency`]: crate::config::ExecutionOptions::planning_concurrency
174#[derive(Default)]
175pub struct DefaultPhysicalPlanner {
176    extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
177}
178
179#[async_trait]
180impl PhysicalPlanner for DefaultPhysicalPlanner {
181    /// Create a physical plan from a logical plan
182    async fn create_physical_plan(
183        &self,
184        logical_plan: &LogicalPlan,
185        session_state: &SessionState,
186    ) -> Result<Arc<dyn ExecutionPlan>> {
187        if let Some(plan) = self
188            .handle_explain_or_analyze(logical_plan, session_state)
189            .await?
190        {
191            return Ok(plan);
192        }
193        let plan = self
194            .create_initial_plan(logical_plan, session_state)
195            .await?;
196
197        self.optimize_physical_plan(plan, session_state, |_, _| {})
198    }
199
200    /// Create a physical expression from a logical expression
201    /// suitable for evaluation
202    ///
203    /// `e`: the expression to convert
204    ///
205    /// `input_dfschema`: the logical plan schema for evaluating `e`
206    fn create_physical_expr(
207        &self,
208        expr: &Expr,
209        input_dfschema: &DFSchema,
210        session_state: &SessionState,
211    ) -> Result<Arc<dyn PhysicalExpr>> {
212        create_physical_expr(expr, input_dfschema, session_state.execution_props())
213    }
214}
215
216#[derive(Debug)]
217struct ExecutionPlanChild {
218    /// Index needed to order children of parent to ensure consistency with original
219    /// `LogicalPlan`
220    index: usize,
221    plan: Arc<dyn ExecutionPlan>,
222}
223
224#[derive(Debug)]
225enum NodeState {
226    ZeroOrOneChild,
227    /// Nodes with multiple children will have multiple tasks accessing it,
228    /// and each task will append their contribution until the last task takes
229    /// all the children to build the parent node.
230    TwoOrMoreChildren(Mutex<Vec<ExecutionPlanChild>>),
231}
232
233/// To avoid needing to pass single child wrapped in a Vec for nodes
234/// with only one child.
235enum ChildrenContainer {
236    None,
237    One(Arc<dyn ExecutionPlan>),
238    Multiple(Vec<Arc<dyn ExecutionPlan>>),
239}
240
241impl ChildrenContainer {
242    fn one(self) -> Result<Arc<dyn ExecutionPlan>> {
243        match self {
244            Self::One(p) => Ok(p),
245            _ => internal_err!("More than one child in ChildrenContainer"),
246        }
247    }
248
249    fn two(self) -> Result<[Arc<dyn ExecutionPlan>; 2]> {
250        match self {
251            Self::Multiple(v) if v.len() == 2 => Ok(v.try_into().unwrap()),
252            _ => internal_err!("ChildrenContainer doesn't contain exactly 2 children"),
253        }
254    }
255
256    fn vec(self) -> Vec<Arc<dyn ExecutionPlan>> {
257        match self {
258            Self::None => vec![],
259            Self::One(p) => vec![p],
260            Self::Multiple(v) => v,
261        }
262    }
263}
264
265#[derive(Debug)]
266struct LogicalNode<'a> {
267    node: &'a LogicalPlan,
268    // None if root
269    parent_index: Option<usize>,
270    state: NodeState,
271}
272
273impl DefaultPhysicalPlanner {
274    /// Create a physical planner that uses `extension_planners` to
275    /// plan user-defined logical nodes [`LogicalPlan::Extension`].
276    /// The planner uses the first [`ExtensionPlanner`] to return a non-`None`
277    /// plan.
278    pub fn with_extension_planners(
279        extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
280    ) -> Self {
281        Self { extension_planners }
282    }
283
284    /// Create a physical plan from a logical plan
285    async fn create_initial_plan(
286        &self,
287        logical_plan: &LogicalPlan,
288        session_state: &SessionState,
289    ) -> Result<Arc<dyn ExecutionPlan>> {
290        // DFS the tree to flatten it into a Vec.
291        // This will allow us to build the Physical Plan from the leaves up
292        // to avoid recursion, and also to make it easier to build a valid
293        // Physical Plan from the start and not rely on some intermediate
294        // representation (since parents need to know their children at
295        // construction time).
296        let mut flat_tree = vec![];
297        let mut dfs_visit_stack = vec![(None, logical_plan)];
298        // Use this to be able to find the leaves to start construction bottom
299        // up concurrently.
300        let mut flat_tree_leaf_indices = vec![];
301        while let Some((parent_index, node)) = dfs_visit_stack.pop() {
302            let current_index = flat_tree.len();
303            // Because of how we extend the visit stack here, we visit the children
304            // in reverse order of how they appear, so later we need to reverse
305            // the order of children when building the nodes.
306            dfs_visit_stack
307                .extend(node.inputs().iter().map(|&n| (Some(current_index), n)));
308            let state = match node.inputs().len() {
309                0 => {
310                    flat_tree_leaf_indices.push(current_index);
311                    NodeState::ZeroOrOneChild
312                }
313                1 => NodeState::ZeroOrOneChild,
314                _ => {
315                    let ready_children = Vec::with_capacity(node.inputs().len());
316                    let ready_children = Mutex::new(ready_children);
317                    NodeState::TwoOrMoreChildren(ready_children)
318                }
319            };
320            let node = LogicalNode {
321                node,
322                parent_index,
323                state,
324            };
325            flat_tree.push(node);
326        }
327        let flat_tree = Arc::new(flat_tree);
328
329        let planning_concurrency = session_state
330            .config_options()
331            .execution
332            .planning_concurrency;
333        // Can never spawn more tasks than leaves in the tree, as these tasks must
334        // all converge down to the root node, which can only be processed by a
335        // single task.
336        let max_concurrency = planning_concurrency.min(flat_tree_leaf_indices.len());
337
338        // Spawning tasks which will traverse leaf up to the root.
339        let tasks = flat_tree_leaf_indices
340            .into_iter()
341            .map(|index| self.task_helper(index, Arc::clone(&flat_tree), session_state));
342        let mut outputs = futures::stream::iter(tasks)
343            .buffer_unordered(max_concurrency)
344            .try_collect::<Vec<_>>()
345            .await?
346            .into_iter()
347            .flatten()
348            .collect::<Vec<_>>();
349        // Ideally this never happens if we have a valid LogicalPlan tree
350        if outputs.len() != 1 {
351            return internal_err!(
352                "Failed to convert LogicalPlan to ExecutionPlan: More than one root detected"
353            );
354        }
355        let plan = outputs.pop().unwrap();
356        Ok(plan)
357    }
358
359    /// These tasks start at a leaf and traverse up the tree towards the root, building
360    /// an ExecutionPlan as they go. When they reach a node with two or more children,
361    /// they append their current result (a child of the parent node) to the children
362    /// vector, and if this is sufficient to create the parent then continues traversing
363    /// the tree to create nodes. Otherwise, the task terminates.
364    async fn task_helper<'a>(
365        &'a self,
366        leaf_starter_index: usize,
367        flat_tree: Arc<Vec<LogicalNode<'a>>>,
368        session_state: &'a SessionState,
369    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
370        // We always start with a leaf, so can ignore status and pass empty children
371        let mut node = flat_tree.get(leaf_starter_index).ok_or_else(|| {
372            internal_datafusion_err!(
373                "Invalid index whilst creating initial physical plan"
374            )
375        })?;
376        let mut plan = self
377            .map_logical_node_to_physical(
378                node.node,
379                session_state,
380                ChildrenContainer::None,
381            )
382            .await?;
383        let mut current_index = leaf_starter_index;
384        // parent_index is None only for root
385        while let Some(parent_index) = node.parent_index {
386            node = flat_tree.get(parent_index).ok_or_else(|| {
387                internal_datafusion_err!(
388                    "Invalid index whilst creating initial physical plan"
389                )
390            })?;
391            match &node.state {
392                NodeState::ZeroOrOneChild => {
393                    plan = self
394                        .map_logical_node_to_physical(
395                            node.node,
396                            session_state,
397                            ChildrenContainer::One(plan),
398                        )
399                        .await?;
400                }
401                // See if we have all children to build the node.
402                NodeState::TwoOrMoreChildren(children) => {
403                    let mut children: Vec<ExecutionPlanChild> = {
404                        let mut guard = children.lock().await;
405                        // Add our contribution to this parent node.
406                        // Vec is pre-allocated so no allocation should occur here.
407                        guard.push(ExecutionPlanChild {
408                            index: current_index,
409                            plan,
410                        });
411                        if guard.len() < node.node.inputs().len() {
412                            // This node is not ready yet, still pending more children.
413                            // This task is finished forever.
414                            return Ok(None);
415                        }
416
417                        // With this task's contribution we have enough children.
418                        // This task is the only one building this node now, and thus
419                        // no other task will need the Mutex for this node, so take
420                        // all children.
421                        std::mem::take(guard.as_mut())
422                    };
423
424                    // Indices refer to position in flat tree Vec, which means they are
425                    // guaranteed to be unique, hence unstable sort used.
426                    //
427                    // We reverse sort because of how we visited the node in the initial
428                    // DFS traversal (see above).
429                    children.sort_unstable_by_key(|epc| std::cmp::Reverse(epc.index));
430                    let children = children.into_iter().map(|epc| epc.plan).collect();
431                    let children = ChildrenContainer::Multiple(children);
432                    plan = self
433                        .map_logical_node_to_physical(node.node, session_state, children)
434                        .await?;
435                }
436            }
437            current_index = parent_index;
438        }
439        // Only one task should ever reach this point for a valid LogicalPlan tree.
440        Ok(Some(plan))
441    }
442
443    /// Given a single LogicalPlan node, map it to its physical ExecutionPlan counterpart.
444    async fn map_logical_node_to_physical(
445        &self,
446        node: &LogicalPlan,
447        session_state: &SessionState,
448        children: ChildrenContainer,
449    ) -> Result<Arc<dyn ExecutionPlan>> {
450        let exec_node: Arc<dyn ExecutionPlan> = match node {
451            // Leaves (no children)
452            LogicalPlan::TableScan(TableScan {
453                source,
454                projection,
455                filters,
456                fetch,
457                ..
458            }) => {
459                let source = source_as_provider(source)?;
460                // Remove all qualifiers from the scan as the provider
461                // doesn't know (nor should care) how the relation was
462                // referred to in the query
463                let filters = unnormalize_cols(filters.iter().cloned());
464                let filters_vec = filters.into_iter().collect::<Vec<_>>();
465                let opts = ScanArgs::default()
466                    .with_projection(projection.as_deref())
467                    .with_filters(Some(&filters_vec))
468                    .with_limit(*fetch);
469                let res = source.scan_with_args(session_state, opts).await?;
470                Arc::clone(res.plan())
471            }
472            LogicalPlan::Values(Values { values, schema }) => {
473                let exprs = values
474                    .iter()
475                    .map(|row| {
476                        row.iter()
477                            .map(|expr| {
478                                self.create_physical_expr(expr, schema, session_state)
479                            })
480                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
481                    })
482                    .collect::<Result<Vec<_>>>()?;
483                MemorySourceConfig::try_new_as_values(Arc::clone(schema.inner()), exprs)?
484                    as _
485            }
486            LogicalPlan::EmptyRelation(EmptyRelation {
487                produce_one_row: false,
488                schema,
489            }) => Arc::new(EmptyExec::new(Arc::clone(schema.inner()))),
490            LogicalPlan::EmptyRelation(EmptyRelation {
491                produce_one_row: true,
492                schema,
493            }) => Arc::new(PlaceholderRowExec::new(Arc::clone(schema.inner()))),
494            LogicalPlan::DescribeTable(DescribeTable {
495                schema,
496                output_schema,
497            }) => {
498                let output_schema = Arc::clone(output_schema.inner());
499                self.plan_describe(Arc::clone(schema), output_schema)?
500            }
501
502            // 1 Child
503            LogicalPlan::Copy(CopyTo {
504                input,
505                output_url,
506                file_type,
507                partition_by,
508                options: source_option_tuples,
509                output_schema: _,
510            }) => {
511                let original_url = output_url.clone();
512                let input_exec = children.one()?;
513                let parsed_url = ListingTableUrl::parse(output_url)?;
514                let object_store_url = parsed_url.object_store();
515
516                let schema = Arc::clone(input.schema().inner());
517
518                // Note: the DataType passed here is ignored for the purposes of writing and inferred instead
519                // from the schema of the RecordBatch being written. This allows COPY statements to specify only
520                // the column name rather than column name + explicit data type.
521                let table_partition_cols = partition_by
522                    .iter()
523                    .map(|s| (s.to_string(), arrow::datatypes::DataType::Null))
524                    .collect::<Vec<_>>();
525
526                let keep_partition_by_columns = match source_option_tuples
527                    .get("execution.keep_partition_by_columns")
528                    .map(|v| v.trim()) {
529                    None => session_state.config().options().execution.keep_partition_by_columns,
530                    Some("true") => true,
531                    Some("false") => false,
532                    Some(value) =>
533                        return Err(DataFusionError::Configuration(format!("provided value for 'execution.keep_partition_by_columns' was not recognized: \"{value}\""))),
534                };
535
536                let sink_format = file_type_to_format(file_type)?
537                    .create(session_state, source_option_tuples)?;
538
539                // Determine extension based on format extension and compression
540                let file_extension = match sink_format.compression_type() {
541                    Some(compression_type) => sink_format
542                        .get_ext_with_compression(&compression_type)
543                        .unwrap_or_else(|_| sink_format.get_ext()),
544                    None => sink_format.get_ext(),
545                };
546
547                // Set file sink related options
548                let config = FileSinkConfig {
549                    original_url,
550                    object_store_url,
551                    table_paths: vec![parsed_url],
552                    file_group: FileGroup::default(),
553                    output_schema: schema,
554                    table_partition_cols,
555                    insert_op: InsertOp::Append,
556                    keep_partition_by_columns,
557                    file_extension,
558                };
559
560                let ordering = input_exec.properties().output_ordering().cloned();
561
562                sink_format
563                    .create_writer_physical_plan(
564                        input_exec,
565                        session_state,
566                        config,
567                        ordering.map(Into::into),
568                    )
569                    .await?
570            }
571            LogicalPlan::Dml(DmlStatement {
572                target,
573                op: WriteOp::Insert(insert_op),
574                ..
575            }) => {
576                if let Some(provider) =
577                    target.as_any().downcast_ref::<DefaultTableSource>()
578                {
579                    let input_exec = children.one()?;
580                    provider
581                        .table_provider
582                        .insert_into(session_state, input_exec, *insert_op)
583                        .await?
584                } else {
585                    return exec_err!(
586                        "Table source can't be downcasted to DefaultTableSource"
587                    );
588                }
589            }
590            LogicalPlan::Window(Window { window_expr, .. }) => {
591                if window_expr.is_empty() {
592                    return internal_err!("Impossibly got empty window expression");
593                }
594
595                let input_exec = children.one()?;
596
597                let get_sort_keys = |expr: &Expr| match expr {
598                    Expr::WindowFunction(window_fun) => {
599                        let WindowFunctionParams {
600                            ref partition_by,
601                            ref order_by,
602                            ..
603                        } = &window_fun.as_ref().params;
604                        generate_sort_key(partition_by, order_by)
605                    }
606                    Expr::Alias(Alias { expr, .. }) => {
607                        // Convert &Box<T> to &T
608                        match &**expr {
609                            Expr::WindowFunction(window_fun) => {
610                                let WindowFunctionParams {
611                                    ref partition_by,
612                                    ref order_by,
613                                    ..
614                                } = &window_fun.as_ref().params;
615                                generate_sort_key(partition_by, order_by)
616                            }
617                            _ => unreachable!(),
618                        }
619                    }
620                    _ => unreachable!(),
621                };
622                let sort_keys = get_sort_keys(&window_expr[0])?;
623                if window_expr.len() > 1 {
624                    debug_assert!(
625                            window_expr[1..]
626                                .iter()
627                                .all(|expr| get_sort_keys(expr).unwrap() == sort_keys),
628                            "all window expressions shall have the same sort keys, as guaranteed by logical planning"
629                        );
630                }
631
632                let logical_schema = node.schema();
633                let window_expr = window_expr
634                    .iter()
635                    .map(|e| {
636                        create_window_expr(
637                            e,
638                            logical_schema,
639                            session_state.execution_props(),
640                        )
641                    })
642                    .collect::<Result<Vec<_>>>()?;
643
644                let can_repartition = session_state.config().target_partitions() > 1
645                    && session_state.config().repartition_window_functions();
646
647                let uses_bounded_memory =
648                    window_expr.iter().all(|e| e.uses_bounded_memory());
649                // If all window expressions can run with bounded memory,
650                // choose the bounded window variant:
651                if uses_bounded_memory {
652                    Arc::new(BoundedWindowAggExec::try_new(
653                        window_expr,
654                        input_exec,
655                        InputOrderMode::Sorted,
656                        can_repartition,
657                    )?)
658                } else {
659                    Arc::new(WindowAggExec::try_new(
660                        window_expr,
661                        input_exec,
662                        can_repartition,
663                    )?)
664                }
665            }
666            LogicalPlan::Aggregate(Aggregate {
667                input,
668                group_expr,
669                aggr_expr,
670                ..
671            }) => {
672                let options = session_state.config().options();
673                // Initially need to perform the aggregate and then merge the partitions
674                let input_exec = children.one()?;
675                let physical_input_schema = input_exec.schema();
676                let logical_input_schema = input.as_ref().schema();
677                let physical_input_schema_from_logical = logical_input_schema.inner();
678
679                if !options.execution.skip_physical_aggregate_schema_check
680                    && !schema_satisfied_by(
681                        physical_input_schema_from_logical,
682                        &physical_input_schema,
683                    )
684                {
685                    let mut differences = Vec::new();
686                    if physical_input_schema.fields().len()
687                        != physical_input_schema_from_logical.fields().len()
688                    {
689                        differences.push(format!(
690                            "Different number of fields: (physical) {} vs (logical) {}",
691                            physical_input_schema.fields().len(),
692                            physical_input_schema_from_logical.fields().len()
693                        ));
694                    }
695                    for (i, (physical_field, logical_field)) in physical_input_schema
696                        .fields()
697                        .iter()
698                        .zip(physical_input_schema_from_logical.fields())
699                        .enumerate()
700                    {
701                        if physical_field.name() != logical_field.name() {
702                            differences.push(format!(
703                                "field name at index {}: (physical) {} vs (logical) {}",
704                                i,
705                                physical_field.name(),
706                                logical_field.name()
707                            ));
708                        }
709                        if physical_field.data_type() != logical_field.data_type() {
710                            differences.push(format!("field data type at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.data_type(), logical_field.data_type()));
711                        }
712                        if physical_field.is_nullable() && !logical_field.is_nullable() {
713                            differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable()));
714                        }
715                    }
716                    return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences
717                        .iter()
718                        .map(|s| format!("\n\t- {s}"))
719                        .join(""));
720                }
721
722                let groups = self.create_grouping_physical_expr(
723                    group_expr,
724                    logical_input_schema,
725                    &physical_input_schema,
726                    session_state,
727                )?;
728
729                let agg_filter = aggr_expr
730                    .iter()
731                    .map(|e| {
732                        create_aggregate_expr_and_maybe_filter(
733                            e,
734                            logical_input_schema,
735                            &physical_input_schema,
736                            session_state.execution_props(),
737                        )
738                    })
739                    .collect::<Result<Vec<_>>>()?;
740
741                let (mut aggregates, filters, _order_bys): (Vec<_>, Vec<_>, Vec<_>) =
742                    multiunzip(agg_filter);
743
744                let mut async_exprs = Vec::new();
745                let num_input_columns = physical_input_schema.fields().len();
746
747                for agg_func in &mut aggregates {
748                    match self.try_plan_async_exprs(
749                        num_input_columns,
750                        PlannedExprResult::Expr(agg_func.expressions()),
751                        physical_input_schema.as_ref(),
752                    )? {
753                        PlanAsyncExpr::Async(
754                            async_map,
755                            PlannedExprResult::Expr(physical_exprs),
756                        ) => {
757                            async_exprs.extend(async_map.async_exprs);
758
759                            if let Some(new_agg_func) = agg_func.with_new_expressions(
760                                physical_exprs,
761                                agg_func
762                                    .order_bys()
763                                    .iter()
764                                    .cloned()
765                                    .map(|x| x.expr)
766                                    .collect(),
767                            ) {
768                                *agg_func = Arc::new(new_agg_func);
769                            } else {
770                                return internal_err!("Failed to plan async expression");
771                            }
772                        }
773                        PlanAsyncExpr::Sync(PlannedExprResult::Expr(_)) => {
774                            // Do nothing
775                        }
776                        _ => {
777                            return internal_err!(
778                                "Unexpected result from try_plan_async_exprs"
779                            )
780                        }
781                    }
782                }
783                let input_exec = if !async_exprs.is_empty() {
784                    Arc::new(AsyncFuncExec::try_new(async_exprs, input_exec)?)
785                } else {
786                    input_exec
787                };
788
789                let initial_aggr = Arc::new(AggregateExec::try_new(
790                    AggregateMode::Partial,
791                    groups.clone(),
792                    aggregates,
793                    filters.clone(),
794                    input_exec,
795                    Arc::clone(&physical_input_schema),
796                )?);
797
798                let can_repartition = !groups.is_empty()
799                    && session_state.config().target_partitions() > 1
800                    && session_state.config().repartition_aggregations();
801
802                // Some aggregators may be modified during initialization for
803                // optimization purposes. For example, a FIRST_VALUE may turn
804                // into a LAST_VALUE with the reverse ordering requirement.
805                // To reflect such changes to subsequent stages, use the updated
806                // `AggregateFunctionExpr`/`PhysicalSortExpr` objects.
807                let updated_aggregates = initial_aggr.aggr_expr().to_vec();
808
809                let next_partition_mode = if can_repartition {
810                    // construct a second aggregation with 'AggregateMode::FinalPartitioned'
811                    AggregateMode::FinalPartitioned
812                } else {
813                    // construct a second aggregation, keeping the final column name equal to the
814                    // first aggregation and the expressions corresponding to the respective aggregate
815                    AggregateMode::Final
816                };
817
818                let final_grouping_set = initial_aggr.group_expr().as_final();
819
820                Arc::new(AggregateExec::try_new(
821                    next_partition_mode,
822                    final_grouping_set,
823                    updated_aggregates,
824                    filters,
825                    initial_aggr,
826                    Arc::clone(&physical_input_schema),
827                )?)
828            }
829            LogicalPlan::Projection(Projection { input, expr, .. }) => self
830                .create_project_physical_exec(
831                    session_state,
832                    children.one()?,
833                    input,
834                    expr,
835                )?,
836            LogicalPlan::Filter(Filter {
837                predicate, input, ..
838            }) => {
839                let physical_input = children.one()?;
840                let input_dfschema = input.schema();
841
842                let runtime_expr =
843                    self.create_physical_expr(predicate, input_dfschema, session_state)?;
844
845                let input_schema = input.schema();
846                let filter = match self.try_plan_async_exprs(
847                    input_schema.fields().len(),
848                    PlannedExprResult::Expr(vec![runtime_expr]),
849                    input_schema.as_arrow(),
850                )? {
851                    PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
852                        FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)?
853                    }
854                    PlanAsyncExpr::Async(
855                        async_map,
856                        PlannedExprResult::Expr(runtime_expr),
857                    ) => {
858                        let async_exec = AsyncFuncExec::try_new(
859                            async_map.async_exprs,
860                            physical_input,
861                        )?;
862                        FilterExec::try_new(
863                            Arc::clone(&runtime_expr[0]),
864                            Arc::new(async_exec),
865                        )?
866                        // project the output columns excluding the async functions
867                        // The async functions are always appended to the end of the schema.
868                        .with_projection(Some(
869                            (0..input.schema().fields().len()).collect(),
870                        ))?
871                    }
872                    _ => {
873                        return internal_err!(
874                            "Unexpected result from try_plan_async_exprs"
875                        )
876                    }
877                };
878
879                let selectivity = session_state
880                    .config()
881                    .options()
882                    .optimizer
883                    .default_filter_selectivity;
884                Arc::new(filter.with_default_selectivity(selectivity)?)
885            }
886            LogicalPlan::Repartition(Repartition {
887                input,
888                partitioning_scheme,
889            }) => {
890                let physical_input = children.one()?;
891                let input_dfschema = input.as_ref().schema();
892                let physical_partitioning = match partitioning_scheme {
893                    LogicalPartitioning::RoundRobinBatch(n) => {
894                        Partitioning::RoundRobinBatch(*n)
895                    }
896                    LogicalPartitioning::Hash(expr, n) => {
897                        let runtime_expr = expr
898                            .iter()
899                            .map(|e| {
900                                self.create_physical_expr(
901                                    e,
902                                    input_dfschema,
903                                    session_state,
904                                )
905                            })
906                            .collect::<Result<Vec<_>>>()?;
907                        Partitioning::Hash(runtime_expr, *n)
908                    }
909                    LogicalPartitioning::DistributeBy(_) => {
910                        return not_impl_err!(
911                            "Physical plan does not support DistributeBy partitioning"
912                        );
913                    }
914                };
915                Arc::new(RepartitionExec::try_new(
916                    physical_input,
917                    physical_partitioning,
918                )?)
919            }
920            LogicalPlan::Sort(Sort {
921                expr, input, fetch, ..
922            }) => {
923                let physical_input = children.one()?;
924                let input_dfschema = input.as_ref().schema();
925                let sort_exprs = create_physical_sort_exprs(
926                    expr,
927                    input_dfschema,
928                    session_state.execution_props(),
929                )?;
930                let Some(ordering) = LexOrdering::new(sort_exprs) else {
931                    return internal_err!(
932                        "SortExec requires at least one sort expression"
933                    );
934                };
935                let new_sort = SortExec::new(ordering, physical_input).with_fetch(*fetch);
936                Arc::new(new_sort)
937            }
938            LogicalPlan::Subquery(_) => todo!(),
939            LogicalPlan::SubqueryAlias(_) => children.one()?,
940            LogicalPlan::Limit(limit) => {
941                let input = children.one()?;
942                let SkipType::Literal(skip) = limit.get_skip_type()? else {
943                    return not_impl_err!(
944                        "Unsupported OFFSET expression: {:?}",
945                        limit.skip
946                    );
947                };
948                let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
949                    return not_impl_err!(
950                        "Unsupported LIMIT expression: {:?}",
951                        limit.fetch
952                    );
953                };
954
955                // GlobalLimitExec requires a single partition for input
956                let input = if input.output_partitioning().partition_count() == 1 {
957                    input
958                } else {
959                    // Apply a LocalLimitExec to each partition. The optimizer will also insert
960                    // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec
961                    if let Some(fetch) = fetch {
962                        Arc::new(LocalLimitExec::new(input, fetch + skip))
963                    } else {
964                        input
965                    }
966                };
967
968                Arc::new(GlobalLimitExec::new(input, skip, fetch))
969            }
970            LogicalPlan::Unnest(Unnest {
971                list_type_columns,
972                struct_type_columns,
973                schema,
974                options,
975                ..
976            }) => {
977                let input = children.one()?;
978                let schema = Arc::clone(schema.inner());
979                let list_column_indices = list_type_columns
980                    .iter()
981                    .map(|(index, unnesting)| ListUnnest {
982                        index_in_input_schema: *index,
983                        depth: unnesting.depth,
984                    })
985                    .collect();
986                Arc::new(UnnestExec::new(
987                    input,
988                    list_column_indices,
989                    struct_type_columns.clone(),
990                    schema,
991                    options.clone(),
992                )?)
993            }
994
995            // 2 Children
996            LogicalPlan::Join(Join {
997                left: original_left,
998                right: original_right,
999                on: keys,
1000                filter,
1001                join_type,
1002                null_equality,
1003                schema: join_schema,
1004                ..
1005            }) => {
1006                let [physical_left, physical_right] = children.two()?;
1007
1008                // If join has expression equijoin keys, add physical projection.
1009                let has_expr_join_key = keys.iter().any(|(l, r)| {
1010                    !(matches!(l, Expr::Column(_)) && matches!(r, Expr::Column(_)))
1011                });
1012                let (new_logical, physical_left, physical_right) = if has_expr_join_key {
1013                    // TODO: Can we extract this transformation to somewhere before physical plan
1014                    //       creation?
1015                    let (left_keys, right_keys): (Vec<_>, Vec<_>) =
1016                        keys.iter().cloned().unzip();
1017
1018                    let (left, left_col_keys, left_projected) =
1019                        wrap_projection_for_join_if_necessary(
1020                            &left_keys,
1021                            original_left.as_ref().clone(),
1022                        )?;
1023                    let (right, right_col_keys, right_projected) =
1024                        wrap_projection_for_join_if_necessary(
1025                            &right_keys,
1026                            original_right.as_ref().clone(),
1027                        )?;
1028                    let column_on = (left_col_keys, right_col_keys);
1029
1030                    let left = Arc::new(left);
1031                    let right = Arc::new(right);
1032                    let (new_join, requalified) = Join::try_new_with_project_input(
1033                        node,
1034                        Arc::clone(&left),
1035                        Arc::clone(&right),
1036                        column_on,
1037                    )?;
1038
1039                    let new_join = LogicalPlan::Join(new_join);
1040
1041                    // If inputs were projected then create ExecutionPlan for these new
1042                    // LogicalPlan nodes.
1043                    let physical_left = match (left_projected, left.as_ref()) {
1044                        // If left_projected is true we are guaranteed that left is a Projection
1045                        (
1046                            true,
1047                            LogicalPlan::Projection(Projection { input, expr, .. }),
1048                        ) => self.create_project_physical_exec(
1049                            session_state,
1050                            physical_left,
1051                            input,
1052                            expr,
1053                        )?,
1054                        _ => physical_left,
1055                    };
1056                    let physical_right = match (right_projected, right.as_ref()) {
1057                        // If right_projected is true we are guaranteed that right is a Projection
1058                        (
1059                            true,
1060                            LogicalPlan::Projection(Projection { input, expr, .. }),
1061                        ) => self.create_project_physical_exec(
1062                            session_state,
1063                            physical_right,
1064                            input,
1065                            expr,
1066                        )?,
1067                        _ => physical_right,
1068                    };
1069
1070                    // Remove temporary projected columns
1071                    if left_projected || right_projected {
1072                        // Re-qualify the join schema only if the inputs were previously requalified in
1073                        // `try_new_with_project_input`. This ensures that when building the Projection
1074                        // it can correctly resolve field nullability and data types
1075                        // by disambiguating fields from the left and right sides of the join.
1076                        let qualified_join_schema = if requalified {
1077                            Arc::new(qualify_join_schema_sides(
1078                                join_schema,
1079                                original_left,
1080                                original_right,
1081                            )?)
1082                        } else {
1083                            Arc::clone(join_schema)
1084                        };
1085
1086                        let final_join_result = qualified_join_schema
1087                            .iter()
1088                            .map(Expr::from)
1089                            .collect::<Vec<_>>();
1090                        let projection = LogicalPlan::Projection(Projection::try_new(
1091                            final_join_result,
1092                            Arc::new(new_join),
1093                        )?);
1094                        // LogicalPlan mutated
1095                        (Cow::Owned(projection), physical_left, physical_right)
1096                    } else {
1097                        // LogicalPlan mutated
1098                        (Cow::Owned(new_join), physical_left, physical_right)
1099                    }
1100                } else {
1101                    // LogicalPlan unchanged
1102                    (Cow::Borrowed(node), physical_left, physical_right)
1103                };
1104
1105                // Retrieving new left/right and join keys (in case plan was mutated above)
1106                let (left, right, keys, new_project) = match new_logical.as_ref() {
1107                    LogicalPlan::Projection(Projection { input, expr, .. }) => {
1108                        if let LogicalPlan::Join(Join {
1109                            left, right, on, ..
1110                        }) = input.as_ref()
1111                        {
1112                            (left, right, on, Some((input, expr)))
1113                        } else {
1114                            unreachable!()
1115                        }
1116                    }
1117                    LogicalPlan::Join(Join {
1118                        left, right, on, ..
1119                    }) => (left, right, on, None),
1120                    // Should either be the original Join, or Join with a Projection on top
1121                    _ => unreachable!(),
1122                };
1123
1124                // All equi-join keys are columns now, create physical join plan
1125                let left_df_schema = left.schema();
1126                let right_df_schema = right.schema();
1127                let execution_props = session_state.execution_props();
1128                let join_on = keys
1129                    .iter()
1130                    .map(|(l, r)| {
1131                        let l = create_physical_expr(l, left_df_schema, execution_props)?;
1132                        let r =
1133                            create_physical_expr(r, right_df_schema, execution_props)?;
1134                        Ok((l, r))
1135                    })
1136                    .collect::<Result<join_utils::JoinOn>>()?;
1137
1138                // TODO: `num_range_filters` can be used later on for ASOF joins (`num_range_filters > 1`)
1139                let mut num_range_filters = 0;
1140                let mut range_filters: Vec<Expr> = Vec::new();
1141                let mut total_filters = 0;
1142
1143                let join_filter = match filter {
1144                    Some(expr) => {
1145                        let split_expr = split_conjunction(expr);
1146                        for expr in split_expr.iter() {
1147                            match *expr {
1148                                Expr::BinaryExpr(BinaryExpr {
1149                                    left: _,
1150                                    right: _,
1151                                    op,
1152                                }) => {
1153                                    if matches!(
1154                                        op,
1155                                        Operator::Lt
1156                                            | Operator::LtEq
1157                                            | Operator::Gt
1158                                            | Operator::GtEq
1159                                    ) {
1160                                        range_filters.push((**expr).clone());
1161                                        num_range_filters += 1;
1162                                    }
1163                                    total_filters += 1;
1164                                }
1165                                // TODO: Want to deal with `Expr::Between` for IEJoins, it counts as two range predicates
1166                                // which is why it is not dealt with in PWMJ
1167                                // Expr::Between(_) => {},
1168                                _ => {
1169                                    total_filters += 1;
1170                                }
1171                            }
1172                        }
1173
1174                        // Extract columns from filter expression and saved in a HashSet
1175                        let cols = expr.column_refs();
1176
1177                        // Collect left & right field indices, the field indices are sorted in ascending order
1178                        let left_field_indices = cols
1179                            .iter()
1180                            .filter_map(|c| left_df_schema.index_of_column(c).ok())
1181                            .sorted()
1182                            .collect::<Vec<_>>();
1183                        let right_field_indices = cols
1184                            .iter()
1185                            .filter_map(|c| right_df_schema.index_of_column(c).ok())
1186                            .sorted()
1187                            .collect::<Vec<_>>();
1188
1189                        // Collect DFFields and Fields required for intermediate schemas
1190                        let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) =
1191                            left_field_indices
1192                                .clone()
1193                                .into_iter()
1194                                .map(|i| {
1195                                    (
1196                                        left_df_schema.qualified_field(i),
1197                                        physical_left.schema().field(i).clone(),
1198                                    )
1199                                })
1200                                .chain(right_field_indices.clone().into_iter().map(|i| {
1201                                    (
1202                                        right_df_schema.qualified_field(i),
1203                                        physical_right.schema().field(i).clone(),
1204                                    )
1205                                }))
1206                                .unzip();
1207                        let filter_df_fields = filter_df_fields
1208                            .into_iter()
1209                            .map(|(qualifier, field)| {
1210                                (qualifier.cloned(), Arc::new(field.clone()))
1211                            })
1212                            .collect();
1213
1214                        let metadata: HashMap<_, _> = left_df_schema
1215                            .metadata()
1216                            .clone()
1217                            .into_iter()
1218                            .chain(right_df_schema.metadata().clone())
1219                            .collect();
1220
1221                        // Construct intermediate schemas used for filtering data and
1222                        // convert logical expression to physical according to filter schema
1223                        let filter_df_schema = DFSchema::new_with_metadata(
1224                            filter_df_fields,
1225                            metadata.clone(),
1226                        )?;
1227                        let filter_schema =
1228                            Schema::new_with_metadata(filter_fields, metadata);
1229
1230                        let filter_expr = create_physical_expr(
1231                            expr,
1232                            &filter_df_schema,
1233                            session_state.execution_props(),
1234                        )?;
1235                        let column_indices = join_utils::JoinFilter::build_column_indices(
1236                            left_field_indices,
1237                            right_field_indices,
1238                        );
1239
1240                        Some(join_utils::JoinFilter::new(
1241                            filter_expr,
1242                            column_indices,
1243                            Arc::new(filter_schema),
1244                        ))
1245                    }
1246                    _ => None,
1247                };
1248
1249                let prefer_hash_join =
1250                    session_state.config_options().optimizer.prefer_hash_join;
1251
1252                // TODO: Allow PWMJ to deal with residual equijoin conditions
1253                let join: Arc<dyn ExecutionPlan> = if join_on.is_empty() {
1254                    if join_filter.is_none() && matches!(join_type, JoinType::Inner) {
1255                        // cross join if there is no join conditions and no join filter set
1256                        Arc::new(CrossJoinExec::new(physical_left, physical_right))
1257                    } else if num_range_filters == 1
1258                        && total_filters == 1
1259                        && !matches!(
1260                            join_type,
1261                            JoinType::LeftSemi
1262                                | JoinType::RightSemi
1263                                | JoinType::LeftAnti
1264                                | JoinType::RightAnti
1265                                | JoinType::LeftMark
1266                                | JoinType::RightMark
1267                        )
1268                        && session_state
1269                            .config_options()
1270                            .optimizer
1271                            .enable_piecewise_merge_join
1272                    {
1273                        let Expr::BinaryExpr(be) = &range_filters[0] else {
1274                            return plan_err!(
1275                                "Unsupported expression for PWMJ: Expected `Expr::BinaryExpr`"
1276                            );
1277                        };
1278
1279                        let mut op = be.op;
1280                        if !matches!(
1281                            op,
1282                            Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq
1283                        ) {
1284                            return plan_err!(
1285                                "Unsupported operator for PWMJ: {:?}. Expected one of <, <=, >, >=",
1286                                op
1287                            );
1288                        }
1289
1290                        fn reverse_ineq(op: Operator) -> Operator {
1291                            match op {
1292                                Operator::Lt => Operator::Gt,
1293                                Operator::LtEq => Operator::GtEq,
1294                                Operator::Gt => Operator::Lt,
1295                                Operator::GtEq => Operator::LtEq,
1296                                _ => op,
1297                            }
1298                        }
1299
1300                        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1301                        enum Side {
1302                            Left,
1303                            Right,
1304                            Both,
1305                        }
1306
1307                        let side_of = |e: &Expr| -> Result<Side> {
1308                            let cols = e.column_refs();
1309                            let any_left = cols
1310                                .iter()
1311                                .any(|c| left_df_schema.index_of_column(c).is_ok());
1312                            let any_right = cols
1313                                .iter()
1314                                .any(|c| right_df_schema.index_of_column(c).is_ok());
1315
1316                            Ok(match (any_left, any_right) {
1317                                (true, false) => Side::Left,
1318                                (false, true) => Side::Right,
1319                                (true, true) => Side::Both,
1320                                _ => unreachable!(),
1321                            })
1322                        };
1323
1324                        let mut lhs_logical = &be.left;
1325                        let mut rhs_logical = &be.right;
1326
1327                        let left_side = side_of(lhs_logical)?;
1328                        let right_side = side_of(rhs_logical)?;
1329                        if matches!(left_side, Side::Both)
1330                            || matches!(right_side, Side::Both)
1331                        {
1332                            return Ok(Arc::new(NestedLoopJoinExec::try_new(
1333                                physical_left,
1334                                physical_right,
1335                                join_filter,
1336                                join_type,
1337                                None,
1338                            )?));
1339                        }
1340
1341                        if left_side == Side::Right && right_side == Side::Left {
1342                            std::mem::swap(&mut lhs_logical, &mut rhs_logical);
1343                            op = reverse_ineq(op);
1344                        } else if !(left_side == Side::Left && right_side == Side::Right)
1345                        {
1346                            return plan_err!(
1347                                "Unsupported operator for PWMJ: {:?}. Expected one of <, <=, >, >=",
1348                                op
1349                            );
1350                        }
1351
1352                        let on_left = create_physical_expr(
1353                            lhs_logical,
1354                            left_df_schema,
1355                            session_state.execution_props(),
1356                        )?;
1357                        let on_right = create_physical_expr(
1358                            rhs_logical,
1359                            right_df_schema,
1360                            session_state.execution_props(),
1361                        )?;
1362
1363                        Arc::new(PiecewiseMergeJoinExec::try_new(
1364                            physical_left,
1365                            physical_right,
1366                            (on_left, on_right),
1367                            op,
1368                            *join_type,
1369                            session_state.config().target_partitions(),
1370                        )?)
1371                    } else {
1372                        // there is no equal join condition, use the nested loop join
1373                        Arc::new(NestedLoopJoinExec::try_new(
1374                            physical_left,
1375                            physical_right,
1376                            join_filter,
1377                            join_type,
1378                            None,
1379                        )?)
1380                    }
1381                } else if session_state.config().target_partitions() > 1
1382                    && session_state.config().repartition_joins()
1383                    && !prefer_hash_join
1384                {
1385                    // Use SortMergeJoin if hash join is not preferred
1386                    let join_on_len = join_on.len();
1387                    Arc::new(SortMergeJoinExec::try_new(
1388                        physical_left,
1389                        physical_right,
1390                        join_on,
1391                        join_filter,
1392                        *join_type,
1393                        vec![SortOptions::default(); join_on_len],
1394                        *null_equality,
1395                    )?)
1396                } else if session_state.config().target_partitions() > 1
1397                    && session_state.config().repartition_joins()
1398                    && prefer_hash_join
1399                {
1400                    Arc::new(HashJoinExec::try_new(
1401                        physical_left,
1402                        physical_right,
1403                        join_on,
1404                        join_filter,
1405                        join_type,
1406                        None,
1407                        PartitionMode::Auto,
1408                        *null_equality,
1409                    )?)
1410                } else {
1411                    Arc::new(HashJoinExec::try_new(
1412                        physical_left,
1413                        physical_right,
1414                        join_on,
1415                        join_filter,
1416                        join_type,
1417                        None,
1418                        PartitionMode::CollectLeft,
1419                        *null_equality,
1420                    )?)
1421                };
1422
1423                // If plan was mutated previously then need to create the ExecutionPlan
1424                // for the new Projection that was applied on top.
1425                if let Some((input, expr)) = new_project {
1426                    self.create_project_physical_exec(session_state, join, input, expr)?
1427                } else {
1428                    join
1429                }
1430            }
1431            LogicalPlan::RecursiveQuery(RecursiveQuery {
1432                name, is_distinct, ..
1433            }) => {
1434                let [static_term, recursive_term] = children.two()?;
1435                Arc::new(RecursiveQueryExec::try_new(
1436                    name.clone(),
1437                    static_term,
1438                    recursive_term,
1439                    *is_distinct,
1440                )?)
1441            }
1442
1443            // N Children
1444            LogicalPlan::Union(_) => UnionExec::try_new(children.vec())?,
1445            LogicalPlan::Extension(Extension { node }) => {
1446                let mut maybe_plan = None;
1447                let children = children.vec();
1448                for planner in &self.extension_planners {
1449                    if maybe_plan.is_some() {
1450                        break;
1451                    }
1452
1453                    let logical_input = node.inputs();
1454                    maybe_plan = planner
1455                        .plan_extension(
1456                            self,
1457                            node.as_ref(),
1458                            &logical_input,
1459                            &children,
1460                            session_state,
1461                        )
1462                        .await?;
1463                }
1464
1465                let plan = match maybe_plan {
1466                        Some(v) => Ok(v),
1467                        _ => plan_err!("No installed planner was able to convert the custom node to an execution plan: {:?}", node)
1468                    }?;
1469
1470                // Ensure the ExecutionPlan's schema matches the
1471                // declared logical schema to catch and warn about
1472                // logic errors when creating user defined plans.
1473                if !node.schema().matches_arrow_schema(&plan.schema()) {
1474                    return plan_err!(
1475                            "Extension planner for {:?} created an ExecutionPlan with mismatched schema. \
1476                            LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
1477                            node, node.schema(), plan.schema()
1478                        );
1479                } else {
1480                    plan
1481                }
1482            }
1483
1484            // Other
1485            LogicalPlan::Statement(statement) => {
1486                // DataFusion is a read-only query engine, but also a library, so consumers may implement this
1487                let name = statement.name();
1488                return not_impl_err!("Unsupported logical plan: Statement({name})");
1489            }
1490            LogicalPlan::Dml(dml) => {
1491                // DataFusion is a read-only query engine, but also a library, so consumers may implement this
1492                return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op);
1493            }
1494            LogicalPlan::Ddl(ddl) => {
1495                // There is no default plan for DDl statements --
1496                // it must be handled at a higher level (so that
1497                // the appropriate table can be registered with
1498                // the context)
1499                let name = ddl.name();
1500                return not_impl_err!("Unsupported logical plan: {name}");
1501            }
1502            LogicalPlan::Explain(_) => {
1503                return internal_err!(
1504                    "Unsupported logical plan: Explain must be root of the plan"
1505                )
1506            }
1507            LogicalPlan::Distinct(_) => {
1508                return internal_err!(
1509                    "Unsupported logical plan: Distinct should be replaced to Aggregate"
1510                )
1511            }
1512            LogicalPlan::Analyze(_) => {
1513                return internal_err!(
1514                    "Unsupported logical plan: Analyze must be root of the plan"
1515                )
1516            }
1517        };
1518        Ok(exec_node)
1519    }
1520
1521    fn create_grouping_physical_expr(
1522        &self,
1523        group_expr: &[Expr],
1524        input_dfschema: &DFSchema,
1525        input_schema: &Schema,
1526        session_state: &SessionState,
1527    ) -> Result<PhysicalGroupBy> {
1528        if group_expr.len() == 1 {
1529            match &group_expr[0] {
1530                Expr::GroupingSet(GroupingSet::GroupingSets(grouping_sets)) => {
1531                    merge_grouping_set_physical_expr(
1532                        grouping_sets,
1533                        input_dfschema,
1534                        input_schema,
1535                        session_state,
1536                    )
1537                }
1538                Expr::GroupingSet(GroupingSet::Cube(exprs)) => create_cube_physical_expr(
1539                    exprs,
1540                    input_dfschema,
1541                    input_schema,
1542                    session_state,
1543                ),
1544                Expr::GroupingSet(GroupingSet::Rollup(exprs)) => {
1545                    create_rollup_physical_expr(
1546                        exprs,
1547                        input_dfschema,
1548                        input_schema,
1549                        session_state,
1550                    )
1551                }
1552                expr => Ok(PhysicalGroupBy::new_single(vec![tuple_err((
1553                    self.create_physical_expr(expr, input_dfschema, session_state),
1554                    physical_name(expr),
1555                ))?])),
1556            }
1557        } else if group_expr.is_empty() {
1558            // No GROUP BY clause - create empty PhysicalGroupBy
1559            Ok(PhysicalGroupBy::new(vec![], vec![], vec![]))
1560        } else {
1561            Ok(PhysicalGroupBy::new_single(
1562                group_expr
1563                    .iter()
1564                    .map(|e| {
1565                        tuple_err((
1566                            self.create_physical_expr(e, input_dfschema, session_state),
1567                            physical_name(e),
1568                        ))
1569                    })
1570                    .collect::<Result<Vec<_>>>()?,
1571            ))
1572        }
1573    }
1574}
1575
1576/// Expand and align a GROUPING SET expression.
1577/// (see <https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS>)
1578///
1579/// This will take a list of grouping sets and ensure that each group is
1580/// properly aligned for the physical execution plan. We do this by
1581/// identifying all unique expression in each group and conforming each
1582/// group to the same set of expression types and ordering.
1583/// For example, if we have something like `GROUPING SETS ((a,b,c),(a),(b),(b,c))`
1584/// we would expand this to `GROUPING SETS ((a,b,c),(a,NULL,NULL),(NULL,b,NULL),(NULL,b,c))
1585/// (see <https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS>)
1586fn merge_grouping_set_physical_expr(
1587    grouping_sets: &[Vec<Expr>],
1588    input_dfschema: &DFSchema,
1589    input_schema: &Schema,
1590    session_state: &SessionState,
1591) -> Result<PhysicalGroupBy> {
1592    let num_groups = grouping_sets.len();
1593    let mut all_exprs: Vec<Expr> = vec![];
1594    let mut grouping_set_expr: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![];
1595    let mut null_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![];
1596
1597    for expr in grouping_sets.iter().flatten() {
1598        if !all_exprs.contains(expr) {
1599            all_exprs.push(expr.clone());
1600
1601            grouping_set_expr.push(get_physical_expr_pair(
1602                expr,
1603                input_dfschema,
1604                session_state,
1605            )?);
1606
1607            null_exprs.push(get_null_physical_expr_pair(
1608                expr,
1609                input_dfschema,
1610                input_schema,
1611                session_state,
1612            )?);
1613        }
1614    }
1615
1616    let mut merged_sets: Vec<Vec<bool>> = Vec::with_capacity(num_groups);
1617
1618    for expr_group in grouping_sets.iter() {
1619        let group: Vec<bool> = all_exprs
1620            .iter()
1621            .map(|expr| !expr_group.contains(expr))
1622            .collect();
1623
1624        merged_sets.push(group)
1625    }
1626
1627    Ok(PhysicalGroupBy::new(
1628        grouping_set_expr,
1629        null_exprs,
1630        merged_sets,
1631    ))
1632}
1633
1634/// Expand and align a CUBE expression. This is a special case of GROUPING SETS
1635/// (see <https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS>)
1636fn create_cube_physical_expr(
1637    exprs: &[Expr],
1638    input_dfschema: &DFSchema,
1639    input_schema: &Schema,
1640    session_state: &SessionState,
1641) -> Result<PhysicalGroupBy> {
1642    let num_of_exprs = exprs.len();
1643    let num_groups = num_of_exprs * num_of_exprs;
1644
1645    let mut null_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
1646        Vec::with_capacity(num_of_exprs);
1647    let mut all_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
1648        Vec::with_capacity(num_of_exprs);
1649
1650    for expr in exprs {
1651        null_exprs.push(get_null_physical_expr_pair(
1652            expr,
1653            input_dfschema,
1654            input_schema,
1655            session_state,
1656        )?);
1657
1658        all_exprs.push(get_physical_expr_pair(expr, input_dfschema, session_state)?)
1659    }
1660
1661    let mut groups: Vec<Vec<bool>> = Vec::with_capacity(num_groups);
1662
1663    groups.push(vec![false; num_of_exprs]);
1664
1665    for null_count in 1..=num_of_exprs {
1666        for null_idx in (0..num_of_exprs).combinations(null_count) {
1667            let mut next_group: Vec<bool> = vec![false; num_of_exprs];
1668            null_idx.into_iter().for_each(|i| next_group[i] = true);
1669            groups.push(next_group);
1670        }
1671    }
1672
1673    Ok(PhysicalGroupBy::new(all_exprs, null_exprs, groups))
1674}
1675
1676/// Expand and align a ROLLUP expression. This is a special case of GROUPING SETS
1677/// (see <https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS>)
1678fn create_rollup_physical_expr(
1679    exprs: &[Expr],
1680    input_dfschema: &DFSchema,
1681    input_schema: &Schema,
1682    session_state: &SessionState,
1683) -> Result<PhysicalGroupBy> {
1684    let num_of_exprs = exprs.len();
1685
1686    let mut null_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
1687        Vec::with_capacity(num_of_exprs);
1688    let mut all_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
1689        Vec::with_capacity(num_of_exprs);
1690
1691    let mut groups: Vec<Vec<bool>> = Vec::with_capacity(num_of_exprs + 1);
1692
1693    for expr in exprs {
1694        null_exprs.push(get_null_physical_expr_pair(
1695            expr,
1696            input_dfschema,
1697            input_schema,
1698            session_state,
1699        )?);
1700
1701        all_exprs.push(get_physical_expr_pair(expr, input_dfschema, session_state)?)
1702    }
1703
1704    for total in 0..=num_of_exprs {
1705        let mut group: Vec<bool> = Vec::with_capacity(num_of_exprs);
1706
1707        for index in 0..num_of_exprs {
1708            if index < total {
1709                group.push(false);
1710            } else {
1711                group.push(true);
1712            }
1713        }
1714
1715        groups.push(group)
1716    }
1717
1718    Ok(PhysicalGroupBy::new(all_exprs, null_exprs, groups))
1719}
1720
1721/// For a given logical expr, get a properly typed NULL ScalarValue physical expression
1722fn get_null_physical_expr_pair(
1723    expr: &Expr,
1724    input_dfschema: &DFSchema,
1725    input_schema: &Schema,
1726    session_state: &SessionState,
1727) -> Result<(Arc<dyn PhysicalExpr>, String)> {
1728    let physical_expr =
1729        create_physical_expr(expr, input_dfschema, session_state.execution_props())?;
1730    let physical_name = physical_name(&expr.clone())?;
1731
1732    let data_type = physical_expr.data_type(input_schema)?;
1733    let null_value: ScalarValue = (&data_type).try_into()?;
1734
1735    let null_value = Literal::new(null_value);
1736    Ok((Arc::new(null_value), physical_name))
1737}
1738
1739/// Qualifies the fields in a join schema with "left" and "right" qualifiers
1740/// without mutating the original schema. This function should only be used when
1741/// the join inputs have already been requalified earlier in `try_new_with_project_input`.
1742///
1743/// The purpose is to avoid ambiguity errors later in planning (e.g., in nullability or data type resolution)
1744/// when converting expressions to fields.
1745fn qualify_join_schema_sides(
1746    join_schema: &DFSchema,
1747    left: &LogicalPlan,
1748    right: &LogicalPlan,
1749) -> Result<DFSchema> {
1750    let left_fields = left.schema().fields();
1751    let right_fields = right.schema().fields();
1752    let join_fields = join_schema.fields();
1753
1754    // Validate lengths
1755    if join_fields.len() != left_fields.len() + right_fields.len() {
1756        return internal_err!(
1757            "Join schema field count must match left and right field count."
1758        );
1759    }
1760
1761    // Validate field names match
1762    for (i, (field, expected)) in join_fields
1763        .iter()
1764        .zip(left_fields.iter().chain(right_fields.iter()))
1765        .enumerate()
1766    {
1767        if field.name() != expected.name() {
1768            return internal_err!(
1769                "Field name mismatch at index {}: expected '{}', found '{}'",
1770                i,
1771                expected.name(),
1772                field.name()
1773            );
1774        }
1775    }
1776
1777    // qualify sides
1778    let qualifiers = join_fields
1779        .iter()
1780        .enumerate()
1781        .map(|(i, _)| {
1782            if i < left_fields.len() {
1783                Some(TableReference::Bare {
1784                    table: Arc::from("left"),
1785                })
1786            } else {
1787                Some(TableReference::Bare {
1788                    table: Arc::from("right"),
1789                })
1790            }
1791        })
1792        .collect();
1793
1794    join_schema.with_field_specific_qualified_schema(qualifiers)
1795}
1796
1797fn get_physical_expr_pair(
1798    expr: &Expr,
1799    input_dfschema: &DFSchema,
1800    session_state: &SessionState,
1801) -> Result<(Arc<dyn PhysicalExpr>, String)> {
1802    let physical_expr =
1803        create_physical_expr(expr, input_dfschema, session_state.execution_props())?;
1804    let physical_name = physical_name(expr)?;
1805    Ok((physical_expr, physical_name))
1806}
1807
1808/// Check if window bounds are valid after schema information is available, and
1809/// window_frame bounds are casted to the corresponding column type.
1810/// queries like:
1811/// OVER (ORDER BY a RANGES BETWEEN 3 PRECEDING AND 5 PRECEDING)
1812/// OVER (ORDER BY a RANGES BETWEEN INTERVAL '3 DAY' PRECEDING AND '5 DAY' PRECEDING)  are rejected
1813pub fn is_window_frame_bound_valid(window_frame: &WindowFrame) -> bool {
1814    match (&window_frame.start_bound, &window_frame.end_bound) {
1815        (WindowFrameBound::Following(_), WindowFrameBound::Preceding(_))
1816        | (WindowFrameBound::Following(_), WindowFrameBound::CurrentRow)
1817        | (WindowFrameBound::CurrentRow, WindowFrameBound::Preceding(_)) => false,
1818        (WindowFrameBound::Preceding(lhs), WindowFrameBound::Preceding(rhs)) => {
1819            !rhs.is_null() && (lhs.is_null() || (lhs >= rhs))
1820        }
1821        (WindowFrameBound::Following(lhs), WindowFrameBound::Following(rhs)) => {
1822            !lhs.is_null() && (rhs.is_null() || (lhs <= rhs))
1823        }
1824        _ => true,
1825    }
1826}
1827
1828/// Create a window expression with a name from a logical expression
1829pub fn create_window_expr_with_name(
1830    e: &Expr,
1831    name: impl Into<String>,
1832    logical_schema: &DFSchema,
1833    execution_props: &ExecutionProps,
1834) -> Result<Arc<dyn WindowExpr>> {
1835    let name = name.into();
1836    let physical_schema = Arc::clone(logical_schema.inner());
1837    match e {
1838        Expr::WindowFunction(window_fun) => {
1839            let WindowFunction {
1840                fun,
1841                params:
1842                    WindowFunctionParams {
1843                        args,
1844                        partition_by,
1845                        order_by,
1846                        window_frame,
1847                        null_treatment,
1848                        distinct,
1849                        filter,
1850                    },
1851            } = window_fun.as_ref();
1852            let physical_args =
1853                create_physical_exprs(args, logical_schema, execution_props)?;
1854            let partition_by =
1855                create_physical_exprs(partition_by, logical_schema, execution_props)?;
1856            let order_by =
1857                create_physical_sort_exprs(order_by, logical_schema, execution_props)?;
1858
1859            if !is_window_frame_bound_valid(window_frame) {
1860                return plan_err!(
1861                        "Invalid window frame: start bound ({}) cannot be larger than end bound ({})",
1862                        window_frame.start_bound, window_frame.end_bound
1863                    );
1864            }
1865
1866            let window_frame = Arc::new(window_frame.clone());
1867            let ignore_nulls = null_treatment.unwrap_or(NullTreatment::RespectNulls)
1868                == NullTreatment::IgnoreNulls;
1869            let physical_filter = filter
1870                .as_ref()
1871                .map(|f| create_physical_expr(f, logical_schema, execution_props))
1872                .transpose()?;
1873
1874            windows::create_window_expr(
1875                fun,
1876                name,
1877                &physical_args,
1878                &partition_by,
1879                &order_by,
1880                window_frame,
1881                physical_schema,
1882                ignore_nulls,
1883                *distinct,
1884                physical_filter,
1885            )
1886        }
1887        other => plan_err!("Invalid window expression '{other:?}'"),
1888    }
1889}
1890
1891/// Create a window expression from a logical expression or an alias
1892pub fn create_window_expr(
1893    e: &Expr,
1894    logical_schema: &DFSchema,
1895    execution_props: &ExecutionProps,
1896) -> Result<Arc<dyn WindowExpr>> {
1897    // unpack aliased logical expressions, e.g. "sum(col) over () as total"
1898    let (name, e) = match e {
1899        Expr::Alias(Alias { expr, name, .. }) => (name.clone(), expr.as_ref()),
1900        _ => (e.schema_name().to_string(), e),
1901    };
1902    create_window_expr_with_name(e, name, logical_schema, execution_props)
1903}
1904
1905type AggregateExprWithOptionalArgs = (
1906    Arc<AggregateFunctionExpr>,
1907    // The filter clause, if any
1908    Option<Arc<dyn PhysicalExpr>>,
1909    // Expressions in the ORDER BY clause
1910    Vec<PhysicalSortExpr>,
1911);
1912
1913/// Create an aggregate expression with a name from a logical expression
1914pub fn create_aggregate_expr_with_name_and_maybe_filter(
1915    e: &Expr,
1916    name: Option<String>,
1917    human_displan: String,
1918    logical_input_schema: &DFSchema,
1919    physical_input_schema: &Schema,
1920    execution_props: &ExecutionProps,
1921) -> Result<AggregateExprWithOptionalArgs> {
1922    match e {
1923        Expr::AggregateFunction(AggregateFunction {
1924            func,
1925            params:
1926                AggregateFunctionParams {
1927                    args,
1928                    distinct,
1929                    filter,
1930                    order_by,
1931                    null_treatment,
1932                },
1933        }) => {
1934            let name = if let Some(name) = name {
1935                name
1936            } else {
1937                physical_name(e)?
1938            };
1939
1940            let physical_args =
1941                create_physical_exprs(args, logical_input_schema, execution_props)?;
1942            let filter = match filter {
1943                Some(e) => Some(create_physical_expr(
1944                    e,
1945                    logical_input_schema,
1946                    execution_props,
1947                )?),
1948                None => None,
1949            };
1950
1951            let ignore_nulls = null_treatment.unwrap_or(NullTreatment::RespectNulls)
1952                == NullTreatment::IgnoreNulls;
1953
1954            let (agg_expr, filter, order_bys) = {
1955                let order_bys = create_physical_sort_exprs(
1956                    order_by,
1957                    logical_input_schema,
1958                    execution_props,
1959                )?;
1960
1961                let agg_expr =
1962                    AggregateExprBuilder::new(func.to_owned(), physical_args.to_vec())
1963                        .order_by(order_bys.clone())
1964                        .schema(Arc::new(physical_input_schema.to_owned()))
1965                        .alias(name)
1966                        .human_display(human_displan)
1967                        .with_ignore_nulls(ignore_nulls)
1968                        .with_distinct(*distinct)
1969                        .build()
1970                        .map(Arc::new)?;
1971
1972                (agg_expr, filter, order_bys)
1973            };
1974
1975            Ok((agg_expr, filter, order_bys))
1976        }
1977        other => internal_err!("Invalid aggregate expression '{other:?}'"),
1978    }
1979}
1980
1981/// Create an aggregate expression from a logical expression or an alias
1982pub fn create_aggregate_expr_and_maybe_filter(
1983    e: &Expr,
1984    logical_input_schema: &DFSchema,
1985    physical_input_schema: &Schema,
1986    execution_props: &ExecutionProps,
1987) -> Result<AggregateExprWithOptionalArgs> {
1988    // Unpack (potentially nested) aliased logical expressions, e.g. "sum(col) as total"
1989    // Some functions like `count_all()` create internal aliases,
1990    // Unwrap all alias layers to get to the underlying aggregate function
1991    let (name, human_display, e) = match e {
1992        Expr::Alias(Alias { name, .. }) => {
1993            let unaliased = e.clone().unalias_nested().data;
1994            (Some(name.clone()), e.human_display().to_string(), unaliased)
1995        }
1996        Expr::AggregateFunction(_) => (
1997            Some(e.schema_name().to_string()),
1998            e.human_display().to_string(),
1999            e.clone(),
2000        ),
2001        _ => (None, String::default(), e.clone()),
2002    };
2003
2004    create_aggregate_expr_with_name_and_maybe_filter(
2005        &e,
2006        name,
2007        human_display,
2008        logical_input_schema,
2009        physical_input_schema,
2010        execution_props,
2011    )
2012}
2013
2014impl DefaultPhysicalPlanner {
2015    /// Handles capturing the various plans for EXPLAIN queries
2016    ///
2017    /// Returns
2018    /// Some(plan) if optimized, and None if logical_plan was not an
2019    /// explain (and thus needs to be optimized as normal)
2020    async fn handle_explain_or_analyze(
2021        &self,
2022        logical_plan: &LogicalPlan,
2023        session_state: &SessionState,
2024    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
2025        let execution_plan = match logical_plan {
2026            LogicalPlan::Explain(e) => self.handle_explain(e, session_state).await?,
2027            LogicalPlan::Analyze(a) => self.handle_analyze(a, session_state).await?,
2028            _ => return Ok(None),
2029        };
2030        Ok(Some(execution_plan))
2031    }
2032
2033    /// Planner for `LogicalPlan::Explain`
2034    async fn handle_explain(
2035        &self,
2036        e: &Explain,
2037        session_state: &SessionState,
2038    ) -> Result<Arc<dyn ExecutionPlan>> {
2039        use PlanType::*;
2040        let mut stringified_plans = vec![];
2041
2042        let config = &session_state.config_options().explain;
2043        let explain_format = &e.explain_format;
2044
2045        if !e.logical_optimization_succeeded {
2046            return Ok(Arc::new(ExplainExec::new(
2047                Arc::clone(e.schema.inner()),
2048                e.stringified_plans.clone(),
2049                true,
2050            )));
2051        }
2052
2053        match explain_format {
2054            ExplainFormat::Indent => { /* fall through */ }
2055            ExplainFormat::Tree => {
2056                // Tree render does not try to explain errors,
2057                let physical_plan = self
2058                    .create_initial_plan(e.plan.as_ref(), session_state)
2059                    .await?;
2060
2061                let optimized_plan = self.optimize_physical_plan(
2062                    physical_plan,
2063                    session_state,
2064                    |_plan, _optimizer| {},
2065                )?;
2066
2067                stringified_plans.push(StringifiedPlan::new(
2068                    FinalPhysicalPlan,
2069                    displayable(optimized_plan.as_ref())
2070                        .set_tree_maximum_render_width(config.tree_maximum_render_width)
2071                        .tree_render()
2072                        .to_string(),
2073                ));
2074            }
2075            ExplainFormat::PostgresJSON => {
2076                stringified_plans.push(StringifiedPlan::new(
2077                    FinalLogicalPlan,
2078                    e.plan.display_pg_json().to_string(),
2079                ));
2080            }
2081            ExplainFormat::Graphviz => {
2082                stringified_plans.push(StringifiedPlan::new(
2083                    FinalLogicalPlan,
2084                    e.plan.display_graphviz().to_string(),
2085                ));
2086            }
2087        };
2088
2089        if !stringified_plans.is_empty() {
2090            return Ok(Arc::new(ExplainExec::new(
2091                Arc::clone(e.schema.inner()),
2092                stringified_plans,
2093                e.verbose,
2094            )));
2095        }
2096
2097        // The indent mode is quite sophisticated, and handles quite a few
2098        // different cases / options for displaying the plan.
2099        if !config.physical_plan_only {
2100            stringified_plans.clone_from(&e.stringified_plans);
2101            if e.logical_optimization_succeeded {
2102                stringified_plans.push(e.plan.to_stringified(FinalLogicalPlan));
2103            }
2104        }
2105
2106        if !config.logical_plan_only && e.logical_optimization_succeeded {
2107            match self
2108                .create_initial_plan(e.plan.as_ref(), session_state)
2109                .await
2110            {
2111                Ok(input) => {
2112                    // Include statistics / schema if enabled
2113                    stringified_plans.push(StringifiedPlan::new(
2114                        InitialPhysicalPlan,
2115                        displayable(input.as_ref())
2116                            .set_show_statistics(config.show_statistics)
2117                            .set_show_schema(config.show_schema)
2118                            .indent(e.verbose)
2119                            .to_string(),
2120                    ));
2121
2122                    // Show statistics + schema in verbose output even if not
2123                    // explicitly requested
2124                    if e.verbose {
2125                        if !config.show_statistics {
2126                            stringified_plans.push(StringifiedPlan::new(
2127                                InitialPhysicalPlanWithStats,
2128                                displayable(input.as_ref())
2129                                    .set_show_statistics(true)
2130                                    .indent(e.verbose)
2131                                    .to_string(),
2132                            ));
2133                        }
2134                        if !config.show_schema {
2135                            stringified_plans.push(StringifiedPlan::new(
2136                                InitialPhysicalPlanWithSchema,
2137                                displayable(input.as_ref())
2138                                    .set_show_schema(true)
2139                                    .indent(e.verbose)
2140                                    .to_string(),
2141                            ));
2142                        }
2143                    }
2144
2145                    let optimized_plan = self.optimize_physical_plan(
2146                        input,
2147                        session_state,
2148                        |plan, optimizer| {
2149                            let optimizer_name = optimizer.name().to_string();
2150                            let plan_type = OptimizedPhysicalPlan { optimizer_name };
2151                            stringified_plans.push(StringifiedPlan::new(
2152                                plan_type,
2153                                displayable(plan)
2154                                    .set_show_statistics(config.show_statistics)
2155                                    .set_show_schema(config.show_schema)
2156                                    .indent(e.verbose)
2157                                    .to_string(),
2158                            ));
2159                        },
2160                    );
2161                    match optimized_plan {
2162                        Ok(input) => {
2163                            // This plan will includes statistics if show_statistics is on
2164                            stringified_plans.push(StringifiedPlan::new(
2165                                FinalPhysicalPlan,
2166                                displayable(input.as_ref())
2167                                    .set_show_statistics(config.show_statistics)
2168                                    .set_show_schema(config.show_schema)
2169                                    .indent(e.verbose)
2170                                    .to_string(),
2171                            ));
2172
2173                            // Show statistics + schema in verbose output even if not
2174                            // explicitly requested
2175                            if e.verbose {
2176                                if !config.show_statistics {
2177                                    stringified_plans.push(StringifiedPlan::new(
2178                                        FinalPhysicalPlanWithStats,
2179                                        displayable(input.as_ref())
2180                                            .set_show_statistics(true)
2181                                            .indent(e.verbose)
2182                                            .to_string(),
2183                                    ));
2184                                }
2185                                if !config.show_schema {
2186                                    stringified_plans.push(StringifiedPlan::new(
2187                                        FinalPhysicalPlanWithSchema,
2188                                        // This will include schema if show_schema is on
2189                                        // and will be set to true if verbose is on
2190                                        displayable(input.as_ref())
2191                                            .set_show_schema(true)
2192                                            .indent(e.verbose)
2193                                            .to_string(),
2194                                    ));
2195                                }
2196                            }
2197                        }
2198                        Err(DataFusionError::Context(optimizer_name, e)) => {
2199                            let plan_type = OptimizedPhysicalPlan { optimizer_name };
2200                            stringified_plans
2201                                .push(StringifiedPlan::new(plan_type, e.to_string()))
2202                        }
2203                        Err(e) => return Err(e),
2204                    }
2205                }
2206                Err(err) => {
2207                    stringified_plans.push(StringifiedPlan::new(
2208                        PhysicalPlanError,
2209                        err.strip_backtrace(),
2210                    ));
2211                }
2212            }
2213        }
2214
2215        Ok(Arc::new(ExplainExec::new(
2216            Arc::clone(e.schema.inner()),
2217            stringified_plans,
2218            e.verbose,
2219        )))
2220    }
2221
2222    async fn handle_analyze(
2223        &self,
2224        a: &Analyze,
2225        session_state: &SessionState,
2226    ) -> Result<Arc<dyn ExecutionPlan>> {
2227        let input = self.create_physical_plan(&a.input, session_state).await?;
2228        let schema = Arc::clone(a.schema.inner());
2229        let show_statistics = session_state.config_options().explain.show_statistics;
2230        let analyze_level = session_state.config_options().explain.analyze_level;
2231        let metric_types = match analyze_level {
2232            ExplainAnalyzeLevel::Summary => vec![MetricType::SUMMARY],
2233            ExplainAnalyzeLevel::Dev => vec![MetricType::SUMMARY, MetricType::DEV],
2234        };
2235        Ok(Arc::new(AnalyzeExec::new(
2236            a.verbose,
2237            show_statistics,
2238            metric_types,
2239            input,
2240            schema,
2241        )))
2242    }
2243
2244    /// Optimize a physical plan by applying each physical optimizer,
2245    /// calling observer(plan, optimizer after each one)
2246    pub fn optimize_physical_plan<F>(
2247        &self,
2248        plan: Arc<dyn ExecutionPlan>,
2249        session_state: &SessionState,
2250        mut observer: F,
2251    ) -> Result<Arc<dyn ExecutionPlan>>
2252    where
2253        F: FnMut(&dyn ExecutionPlan, &dyn PhysicalOptimizerRule),
2254    {
2255        let optimizers = session_state.physical_optimizers();
2256        debug!(
2257            "Input physical plan:\n{}\n",
2258            displayable(plan.as_ref()).indent(false)
2259        );
2260        debug!(
2261            "Detailed input physical plan:\n{}",
2262            displayable(plan.as_ref()).indent(true)
2263        );
2264
2265        // This runs once before any optimization,
2266        // to verify that the plan fulfills the base requirements.
2267        InvariantChecker(InvariantLevel::Always).check(&plan)?;
2268
2269        let mut new_plan = Arc::clone(&plan);
2270        for optimizer in optimizers {
2271            let before_schema = new_plan.schema();
2272            new_plan = optimizer
2273                .optimize(new_plan, session_state.config_options())
2274                .map_err(|e| {
2275                    DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
2276                })?;
2277
2278            // This only checks the schema in release build, and performs additional checks in debug mode.
2279            OptimizationInvariantChecker::new(optimizer)
2280                .check(&new_plan, before_schema)?;
2281
2282            debug!(
2283                "Optimized physical plan by {}:\n{}\n",
2284                optimizer.name(),
2285                displayable(new_plan.as_ref()).indent(false)
2286            );
2287            observer(new_plan.as_ref(), optimizer.as_ref())
2288        }
2289
2290        // This runs once after all optimizer runs are complete,
2291        // to verify that the plan is executable.
2292        InvariantChecker(InvariantLevel::Executable).check(&new_plan)?;
2293
2294        debug!(
2295            "Optimized physical plan:\n{}\n",
2296            displayable(new_plan.as_ref()).indent(false)
2297        );
2298
2299        // Don't print new_plan directly, as that may overflow the stack.
2300        // For example:
2301        // thread 'tokio-runtime-worker' has overflowed its stack
2302        // fatal runtime error: stack overflow, aborting
2303        debug!(
2304            "Detailed optimized physical plan:\n{}\n",
2305            displayable(new_plan.as_ref()).indent(true)
2306        );
2307        Ok(new_plan)
2308    }
2309
2310    // return an record_batch which describes a table's schema.
2311    fn plan_describe(
2312        &self,
2313        table_schema: Arc<Schema>,
2314        output_schema: Arc<Schema>,
2315    ) -> Result<Arc<dyn ExecutionPlan>> {
2316        let mut column_names = StringBuilder::new();
2317        let mut data_types = StringBuilder::new();
2318        let mut is_nullables = StringBuilder::new();
2319        for field in table_schema.fields() {
2320            column_names.append_value(field.name());
2321
2322            // "System supplied type" --> Use debug format of the datatype
2323            let data_type = field.data_type();
2324            data_types.append_value(format!("{data_type}"));
2325
2326            // "YES if the column is possibly nullable, NO if it is known not nullable. "
2327            let nullable_str = if field.is_nullable() { "YES" } else { "NO" };
2328            is_nullables.append_value(nullable_str);
2329        }
2330
2331        let record_batch = RecordBatch::try_new(
2332            output_schema,
2333            vec![
2334                Arc::new(column_names.finish()),
2335                Arc::new(data_types.finish()),
2336                Arc::new(is_nullables.finish()),
2337            ],
2338        )?;
2339
2340        let schema = record_batch.schema();
2341        let partitions = vec![vec![record_batch]];
2342        let projection = None;
2343        let mem_exec = MemorySourceConfig::try_new_exec(&partitions, schema, projection)?;
2344        Ok(mem_exec)
2345    }
2346
2347    fn create_project_physical_exec(
2348        &self,
2349        session_state: &SessionState,
2350        input_exec: Arc<dyn ExecutionPlan>,
2351        input: &Arc<LogicalPlan>,
2352        expr: &[Expr],
2353    ) -> Result<Arc<dyn ExecutionPlan>> {
2354        let input_logical_schema = input.as_ref().schema();
2355        let input_physical_schema = input_exec.schema();
2356        let physical_exprs = expr
2357            .iter()
2358            .map(|e| {
2359                // For projections, SQL planner and logical plan builder may convert user
2360                // provided expressions into logical Column expressions if their results
2361                // are already provided from the input plans. Because we work with
2362                // qualified columns in logical plane, derived columns involve operators or
2363                // functions will contain qualifiers as well. This will result in logical
2364                // columns with names like `SUM(t1.c1)`, `t1.c1 + t1.c2`, etc.
2365                //
2366                // If we run these logical columns through physical_name function, we will
2367                // get physical names with column qualifiers, which violates DataFusion's
2368                // field name semantics. To account for this, we need to derive the
2369                // physical name from physical input instead.
2370                //
2371                // This depends on the invariant that logical schema field index MUST match
2372                // with physical schema field index.
2373                let physical_name = if let Expr::Column(col) = e {
2374                    match input_logical_schema.index_of_column(col) {
2375                        Ok(idx) => {
2376                            // index physical field using logical field index
2377                            Ok(input_exec.schema().field(idx).name().to_string())
2378                        }
2379                        // logical column is not a derived column, safe to pass along to
2380                        // physical_name
2381                        Err(_) => physical_name(e),
2382                    }
2383                } else {
2384                    physical_name(e)
2385                };
2386
2387                let physical_expr =
2388                    self.create_physical_expr(e, input_logical_schema, session_state);
2389
2390                tuple_err((physical_expr, physical_name))
2391            })
2392            .collect::<Result<Vec<_>>>()?;
2393
2394        let num_input_columns = input_exec.schema().fields().len();
2395
2396        match self.try_plan_async_exprs(
2397            num_input_columns,
2398            PlannedExprResult::ExprWithName(physical_exprs),
2399            input_physical_schema.as_ref(),
2400        )? {
2401            PlanAsyncExpr::Sync(PlannedExprResult::ExprWithName(physical_exprs)) => {
2402                let proj_exprs: Vec<ProjectionExpr> = physical_exprs
2403                    .into_iter()
2404                    .map(|(expr, alias)| ProjectionExpr { expr, alias })
2405                    .collect();
2406                Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input_exec)?))
2407            }
2408            PlanAsyncExpr::Async(
2409                async_map,
2410                PlannedExprResult::ExprWithName(physical_exprs),
2411            ) => {
2412                let async_exec =
2413                    AsyncFuncExec::try_new(async_map.async_exprs, input_exec)?;
2414                let proj_exprs: Vec<ProjectionExpr> = physical_exprs
2415                    .into_iter()
2416                    .map(|(expr, alias)| ProjectionExpr { expr, alias })
2417                    .collect();
2418                let new_proj_exec =
2419                    ProjectionExec::try_new(proj_exprs, Arc::new(async_exec))?;
2420                Ok(Arc::new(new_proj_exec))
2421            }
2422            _ => internal_err!("Unexpected PlanAsyncExpressions variant"),
2423        }
2424    }
2425
2426    fn try_plan_async_exprs(
2427        &self,
2428        num_input_columns: usize,
2429        physical_expr: PlannedExprResult,
2430        schema: &Schema,
2431    ) -> Result<PlanAsyncExpr> {
2432        let mut async_map = AsyncMapper::new(num_input_columns);
2433        match &physical_expr {
2434            PlannedExprResult::ExprWithName(exprs) => {
2435                exprs
2436                    .iter()
2437                    .try_for_each(|(expr, _)| async_map.find_references(expr, schema))?;
2438            }
2439            PlannedExprResult::Expr(exprs) => {
2440                exprs
2441                    .iter()
2442                    .try_for_each(|expr| async_map.find_references(expr, schema))?;
2443            }
2444        }
2445
2446        if async_map.is_empty() {
2447            return Ok(PlanAsyncExpr::Sync(physical_expr));
2448        }
2449
2450        let new_exprs = match physical_expr {
2451            PlannedExprResult::ExprWithName(exprs) => PlannedExprResult::ExprWithName(
2452                exprs
2453                    .iter()
2454                    .map(|(expr, column_name)| {
2455                        let new_expr = Arc::clone(expr)
2456                            .transform_up(|e| Ok(async_map.map_expr(e)))?;
2457                        Ok((new_expr.data, column_name.to_string()))
2458                    })
2459                    .collect::<Result<_>>()?,
2460            ),
2461            PlannedExprResult::Expr(exprs) => PlannedExprResult::Expr(
2462                exprs
2463                    .iter()
2464                    .map(|expr| {
2465                        let new_expr = Arc::clone(expr)
2466                            .transform_up(|e| Ok(async_map.map_expr(e)))?;
2467                        Ok(new_expr.data)
2468                    })
2469                    .collect::<Result<_>>()?,
2470            ),
2471        };
2472        // rewrite the projection's expressions in terms of the columns with the result of async evaluation
2473        Ok(PlanAsyncExpr::Async(async_map, new_exprs))
2474    }
2475}
2476
2477#[derive(Debug)]
2478enum PlannedExprResult {
2479    ExprWithName(Vec<(Arc<dyn PhysicalExpr>, String)>),
2480    Expr(Vec<Arc<dyn PhysicalExpr>>),
2481}
2482
2483#[derive(Debug)]
2484enum PlanAsyncExpr {
2485    Sync(PlannedExprResult),
2486    Async(AsyncMapper, PlannedExprResult),
2487}
2488
2489fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
2490    match value {
2491        (Ok(e), Ok(e1)) => Ok((e, e1)),
2492        (Err(e), Ok(_)) => Err(e),
2493        (Ok(_), Err(e1)) => Err(e1),
2494        (Err(e), Err(_)) => Err(e),
2495    }
2496}
2497
2498struct OptimizationInvariantChecker<'a> {
2499    rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
2500}
2501
2502impl<'a> OptimizationInvariantChecker<'a> {
2503    /// Create an [`OptimizationInvariantChecker`] that performs checking per tule.
2504    pub fn new(rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>) -> Self {
2505        Self { rule }
2506    }
2507
2508    /// Checks that the plan change is permitted, returning an Error if not.
2509    ///
2510    /// Conditionally performs schema checks per [PhysicalOptimizerRule::schema_check].
2511    /// In debug mode, this recursively walks the entire physical plan
2512    /// and performs [`ExecutionPlan::check_invariants`].
2513    pub fn check(
2514        &mut self,
2515        plan: &Arc<dyn ExecutionPlan>,
2516        previous_schema: Arc<Schema>,
2517    ) -> Result<()> {
2518        // if the rule is not permitted to change the schema, confirm that it did not change.
2519        if self.rule.schema_check() && plan.schema() != previous_schema {
2520            internal_err!("PhysicalOptimizer rule '{}' failed. Schema mismatch. Expected original schema: {:?}, got new schema: {:?}",
2521                self.rule.name(),
2522                previous_schema,
2523                plan.schema()
2524            )?
2525        }
2526
2527        // check invariants per each ExecutionPlan node
2528        #[cfg(debug_assertions)]
2529        plan.visit(self)?;
2530
2531        Ok(())
2532    }
2533}
2534
2535impl<'n> TreeNodeVisitor<'n> for OptimizationInvariantChecker<'_> {
2536    type Node = Arc<dyn ExecutionPlan>;
2537
2538    fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
2539        // Checks for the more permissive `InvariantLevel::Always`.
2540        // Plans are not guaranteed to be executable after each physical optimizer run.
2541        node.check_invariants(InvariantLevel::Always).map_err(|e|
2542            e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name()))
2543        )?;
2544        Ok(TreeNodeRecursion::Continue)
2545    }
2546}
2547
2548/// Check [`ExecutionPlan`] invariants per [`InvariantLevel`].
2549struct InvariantChecker(InvariantLevel);
2550
2551impl InvariantChecker {
2552    /// Checks that the plan is executable, returning an Error if not.
2553    pub fn check(&mut self, plan: &Arc<dyn ExecutionPlan>) -> Result<()> {
2554        // check invariants per each ExecutionPlan node
2555        plan.visit(self)?;
2556
2557        Ok(())
2558    }
2559}
2560
2561impl<'n> TreeNodeVisitor<'n> for InvariantChecker {
2562    type Node = Arc<dyn ExecutionPlan>;
2563
2564    fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
2565        node.check_invariants(self.0).map_err(|e| {
2566            e.context(format!(
2567                "Invariant for ExecutionPlan node '{}' failed",
2568                node.name()
2569            ))
2570        })?;
2571        Ok(TreeNodeRecursion::Continue)
2572    }
2573}
2574
2575#[cfg(test)]
2576mod tests {
2577    use std::any::Any;
2578    use std::cmp::Ordering;
2579    use std::fmt::{self, Debug};
2580    use std::ops::{BitAnd, Not};
2581
2582    use super::*;
2583    use crate::datasource::file_format::options::CsvReadOptions;
2584    use crate::datasource::MemTable;
2585    use crate::physical_plan::{
2586        expressions, DisplayAs, DisplayFormatType, PlanProperties,
2587        SendableRecordBatchStream,
2588    };
2589    use crate::prelude::{SessionConfig, SessionContext};
2590    use crate::test_util::{scan_empty, scan_empty_with_partitions};
2591
2592    use crate::execution::session_state::SessionStateBuilder;
2593    use arrow::array::{ArrayRef, DictionaryArray, Int32Array};
2594    use arrow::datatypes::{DataType, Field, Int32Type};
2595    use arrow_schema::SchemaRef;
2596    use datafusion_common::config::ConfigOptions;
2597    use datafusion_common::{
2598        assert_contains, DFSchemaRef, TableReference, ToDFSchema as _,
2599    };
2600    use datafusion_execution::runtime_env::RuntimeEnv;
2601    use datafusion_execution::TaskContext;
2602    use datafusion_expr::builder::subquery_alias;
2603    use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore};
2604    use datafusion_functions_aggregate::count::count_all;
2605    use datafusion_functions_aggregate::expr_fn::sum;
2606    use datafusion_physical_expr::EquivalenceProperties;
2607    use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
2608
2609    fn make_session_state() -> SessionState {
2610        let runtime = Arc::new(RuntimeEnv::default());
2611        let config = SessionConfig::new().with_target_partitions(4);
2612        let config = config.set_bool("datafusion.optimizer.skip_failed_rules", false);
2613        SessionStateBuilder::new()
2614            .with_config(config)
2615            .with_runtime_env(runtime)
2616            .with_default_features()
2617            .build()
2618    }
2619
2620    async fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
2621        let session_state = make_session_state();
2622        // optimize the logical plan
2623        let logical_plan = session_state.optimize(logical_plan)?;
2624        let planner = DefaultPhysicalPlanner::default();
2625        planner
2626            .create_physical_plan(&logical_plan, &session_state)
2627            .await
2628    }
2629
2630    #[tokio::test]
2631    async fn test_all_operators() -> Result<()> {
2632        let logical_plan = test_csv_scan()
2633            .await?
2634            // filter clause needs the type coercion rule applied
2635            .filter(col("c7").lt(lit(5_u8)))?
2636            .project(vec![col("c1"), col("c2")])?
2637            .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
2638            .sort(vec![col("c1").sort(true, true)])?
2639            .limit(3, Some(10))?
2640            .build()?;
2641
2642        let exec_plan = plan(&logical_plan).await?;
2643
2644        // verify that the plan correctly casts u8 to i64
2645        // the cast from u8 to i64 for literal will be simplified, and get lit(int64(5))
2646        // the cast here is implicit so has CastOptions with safe=true
2647        let expected = r#"BinaryExpr { left: Column { name: "c7", index: 2 }, op: Lt, right: Literal { value: Int64(5), field: Field { name: "lit", data_type: Int64 } }, fail_on_overflow: false"#;
2648
2649        assert_contains!(format!("{exec_plan:?}"), expected);
2650        Ok(())
2651    }
2652
2653    #[tokio::test]
2654    async fn test_create_cube_expr() -> Result<()> {
2655        let logical_plan = test_csv_scan().await?.build()?;
2656
2657        let plan = plan(&logical_plan).await?;
2658
2659        let exprs = vec![col("c1"), col("c2"), col("c3")];
2660
2661        let physical_input_schema = plan.schema();
2662        let physical_input_schema = physical_input_schema.as_ref();
2663        let logical_input_schema = logical_plan.schema();
2664        let session_state = make_session_state();
2665
2666        let cube = create_cube_physical_expr(
2667            &exprs,
2668            logical_input_schema,
2669            physical_input_schema,
2670            &session_state,
2671        );
2672
2673        insta::assert_debug_snapshot!(cube, @r#"
2674        Ok(
2675            PhysicalGroupBy {
2676                expr: [
2677                    (
2678                        Column {
2679                            name: "c1",
2680                            index: 0,
2681                        },
2682                        "c1",
2683                    ),
2684                    (
2685                        Column {
2686                            name: "c2",
2687                            index: 1,
2688                        },
2689                        "c2",
2690                    ),
2691                    (
2692                        Column {
2693                            name: "c3",
2694                            index: 2,
2695                        },
2696                        "c3",
2697                    ),
2698                ],
2699                null_expr: [
2700                    (
2701                        Literal {
2702                            value: Utf8(NULL),
2703                            field: Field {
2704                                name: "lit",
2705                                data_type: Utf8,
2706                                nullable: true,
2707                            },
2708                        },
2709                        "c1",
2710                    ),
2711                    (
2712                        Literal {
2713                            value: Int64(NULL),
2714                            field: Field {
2715                                name: "lit",
2716                                data_type: Int64,
2717                                nullable: true,
2718                            },
2719                        },
2720                        "c2",
2721                    ),
2722                    (
2723                        Literal {
2724                            value: Int64(NULL),
2725                            field: Field {
2726                                name: "lit",
2727                                data_type: Int64,
2728                                nullable: true,
2729                            },
2730                        },
2731                        "c3",
2732                    ),
2733                ],
2734                groups: [
2735                    [
2736                        false,
2737                        false,
2738                        false,
2739                    ],
2740                    [
2741                        true,
2742                        false,
2743                        false,
2744                    ],
2745                    [
2746                        false,
2747                        true,
2748                        false,
2749                    ],
2750                    [
2751                        false,
2752                        false,
2753                        true,
2754                    ],
2755                    [
2756                        true,
2757                        true,
2758                        false,
2759                    ],
2760                    [
2761                        true,
2762                        false,
2763                        true,
2764                    ],
2765                    [
2766                        false,
2767                        true,
2768                        true,
2769                    ],
2770                    [
2771                        true,
2772                        true,
2773                        true,
2774                    ],
2775                ],
2776            },
2777        )
2778        "#);
2779
2780        Ok(())
2781    }
2782
2783    #[tokio::test]
2784    async fn test_create_rollup_expr() -> Result<()> {
2785        let logical_plan = test_csv_scan().await?.build()?;
2786
2787        let plan = plan(&logical_plan).await?;
2788
2789        let exprs = vec![col("c1"), col("c2"), col("c3")];
2790
2791        let physical_input_schema = plan.schema();
2792        let physical_input_schema = physical_input_schema.as_ref();
2793        let logical_input_schema = logical_plan.schema();
2794        let session_state = make_session_state();
2795
2796        let rollup = create_rollup_physical_expr(
2797            &exprs,
2798            logical_input_schema,
2799            physical_input_schema,
2800            &session_state,
2801        );
2802
2803        insta::assert_debug_snapshot!(rollup, @r#"
2804        Ok(
2805            PhysicalGroupBy {
2806                expr: [
2807                    (
2808                        Column {
2809                            name: "c1",
2810                            index: 0,
2811                        },
2812                        "c1",
2813                    ),
2814                    (
2815                        Column {
2816                            name: "c2",
2817                            index: 1,
2818                        },
2819                        "c2",
2820                    ),
2821                    (
2822                        Column {
2823                            name: "c3",
2824                            index: 2,
2825                        },
2826                        "c3",
2827                    ),
2828                ],
2829                null_expr: [
2830                    (
2831                        Literal {
2832                            value: Utf8(NULL),
2833                            field: Field {
2834                                name: "lit",
2835                                data_type: Utf8,
2836                                nullable: true,
2837                            },
2838                        },
2839                        "c1",
2840                    ),
2841                    (
2842                        Literal {
2843                            value: Int64(NULL),
2844                            field: Field {
2845                                name: "lit",
2846                                data_type: Int64,
2847                                nullable: true,
2848                            },
2849                        },
2850                        "c2",
2851                    ),
2852                    (
2853                        Literal {
2854                            value: Int64(NULL),
2855                            field: Field {
2856                                name: "lit",
2857                                data_type: Int64,
2858                                nullable: true,
2859                            },
2860                        },
2861                        "c3",
2862                    ),
2863                ],
2864                groups: [
2865                    [
2866                        true,
2867                        true,
2868                        true,
2869                    ],
2870                    [
2871                        false,
2872                        true,
2873                        true,
2874                    ],
2875                    [
2876                        false,
2877                        false,
2878                        true,
2879                    ],
2880                    [
2881                        false,
2882                        false,
2883                        false,
2884                    ],
2885                ],
2886            },
2887        )
2888        "#);
2889
2890        Ok(())
2891    }
2892
2893    #[tokio::test]
2894    async fn test_create_not() -> Result<()> {
2895        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
2896        let dfschema = DFSchema::try_from(schema.clone())?;
2897
2898        let planner = DefaultPhysicalPlanner::default();
2899
2900        let expr = planner.create_physical_expr(
2901            &col("a").not(),
2902            &dfschema,
2903            &make_session_state(),
2904        )?;
2905        let expected = expressions::not(expressions::col("a", &schema)?)?;
2906
2907        assert_eq!(format!("{expr:?}"), format!("{expected:?}"));
2908
2909        Ok(())
2910    }
2911
2912    #[tokio::test]
2913    async fn test_with_csv_plan() -> Result<()> {
2914        let logical_plan = test_csv_scan()
2915            .await?
2916            .filter(col("c7").lt(col("c12")))?
2917            .limit(3, None)?
2918            .build()?;
2919
2920        let plan = plan(&logical_plan).await?;
2921
2922        // c12 is f64, c7 is u8 -> cast c7 to f64
2923        // the cast here is implicit so has CastOptions with safe=true
2924        let _expected = "predicate: BinaryExpr { left: TryCastExpr { expr: Column { name: \"c7\", index: 6 }, cast_type: Float64 }, op: Lt, right: Column { name: \"c12\", index: 11 } }";
2925        let plan_debug_str = format!("{plan:?}");
2926        assert!(plan_debug_str.contains("GlobalLimitExec"));
2927        assert!(plan_debug_str.contains("skip: 3"));
2928        Ok(())
2929    }
2930
2931    #[tokio::test]
2932    async fn error_during_extension_planning() {
2933        let session_state = make_session_state();
2934        let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
2935            ErrorExtensionPlanner {},
2936        )]);
2937
2938        let logical_plan = LogicalPlan::Extension(Extension {
2939            node: Arc::new(NoOpExtensionNode::default()),
2940        });
2941        match planner
2942            .create_physical_plan(&logical_plan, &session_state)
2943            .await
2944        {
2945            Ok(_) => panic!("Expected planning failure"),
2946            Err(e) => assert!(e.to_string().contains("BOOM"),),
2947        }
2948    }
2949
2950    #[tokio::test]
2951    async fn test_with_zero_offset_plan() -> Result<()> {
2952        let logical_plan = test_csv_scan().await?.limit(0, None)?.build()?;
2953        let plan = plan(&logical_plan).await?;
2954        assert!(!format!("{plan:?}").contains("limit="));
2955        Ok(())
2956    }
2957
2958    #[tokio::test]
2959    async fn test_limit_with_partitions() -> Result<()> {
2960        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
2961
2962        let logical_plan = scan_empty_with_partitions(Some("test"), &schema, None, 2)?
2963            .limit(3, Some(5))?
2964            .build()?;
2965        let plan = plan(&logical_plan).await?;
2966
2967        assert!(format!("{plan:?}").contains("GlobalLimitExec"));
2968        assert!(format!("{plan:?}").contains("skip: 3, fetch: Some(5)"));
2969
2970        Ok(())
2971    }
2972
2973    #[tokio::test]
2974    async fn errors() -> Result<()> {
2975        let bool_expr = col("c1").eq(col("c1"));
2976        let cases = vec![
2977            // utf8 = utf8
2978            col("c1").eq(col("c1")),
2979            // u8 AND u8
2980            col("c3").bitand(col("c3")),
2981            // utf8 = u8
2982            col("c1").eq(col("c3")),
2983            // bool AND bool
2984            bool_expr.clone().and(bool_expr),
2985        ];
2986        for case in cases {
2987            test_csv_scan().await?.project(vec![case.clone()]).unwrap();
2988        }
2989        Ok(())
2990    }
2991
2992    #[tokio::test]
2993    async fn default_extension_planner() {
2994        let session_state = make_session_state();
2995        let planner = DefaultPhysicalPlanner::default();
2996        let logical_plan = LogicalPlan::Extension(Extension {
2997            node: Arc::new(NoOpExtensionNode::default()),
2998        });
2999        let plan = planner
3000            .create_physical_plan(&logical_plan, &session_state)
3001            .await;
3002
3003        let expected_error =
3004            "No installed planner was able to convert the custom node to an execution plan: NoOp";
3005        match plan {
3006            Ok(_) => panic!("Expected planning failure"),
3007            Err(e) => assert!(
3008                e.to_string().contains(expected_error),
3009                "Error '{e}' did not contain expected error '{expected_error}'"
3010            ),
3011        }
3012    }
3013
3014    #[tokio::test]
3015    async fn bad_extension_planner() {
3016        // Test that creating an execution plan whose schema doesn't
3017        // match the logical plan's schema generates an error.
3018        let session_state = make_session_state();
3019        let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
3020            BadExtensionPlanner {},
3021        )]);
3022
3023        let logical_plan = LogicalPlan::Extension(Extension {
3024            node: Arc::new(NoOpExtensionNode::default()),
3025        });
3026        let e = planner
3027            .create_physical_plan(&logical_plan, &session_state)
3028            .await
3029            .expect_err("planning error")
3030            .strip_backtrace();
3031
3032        insta::assert_snapshot!(e, @r#"Error during planning: Extension planner for NoOp created an ExecutionPlan with mismatched schema. LogicalPlan schema: DFSchema { inner: Schema { fields: [Field { name: "a", data_type: Int32 }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] } }, ExecutionPlan schema: Schema { fields: [Field { name: "b", data_type: Int32 }], metadata: {} }"#);
3033    }
3034
3035    #[tokio::test]
3036    async fn in_list_types() -> Result<()> {
3037        // expression: "a in ('a', 1)"
3038        let list = vec![lit("a"), lit(1i64)];
3039        let logical_plan = test_csv_scan()
3040            .await?
3041            // filter clause needs the type coercion rule applied
3042            .filter(col("c12").lt(lit(0.05)))?
3043            .project(vec![col("c1").in_list(list, false)])?
3044            .build()?;
3045        let execution_plan = plan(&logical_plan).await?;
3046        // verify that the plan correctly adds cast from Int64(1) to Utf8, and the const will be evaluated.
3047
3048        let expected = r#"expr: BinaryExpr { left: BinaryExpr { left: Column { name: "c1", index: 0 }, op: Eq, right: Literal { value: Utf8("a"), field: Field { name: "lit", data_type: Utf8 } }, fail_on_overflow: false }"#;
3049
3050        assert_contains!(format!("{execution_plan:?}"), expected);
3051
3052        Ok(())
3053    }
3054
3055    #[tokio::test]
3056    async fn in_list_types_struct_literal() -> Result<()> {
3057        // expression: "a in (struct::null, 'a')"
3058        let list = vec![struct_literal(), lit("a")];
3059
3060        let logical_plan = test_csv_scan()
3061            .await?
3062            // filter clause needs the type coercion rule applied
3063            .filter(col("c12").lt(lit(0.05)))?
3064            .project(vec![col("c12").lt_eq(lit(0.025)).in_list(list, false)])?
3065            .build()?;
3066        let e = plan(&logical_plan).await.unwrap_err().to_string();
3067
3068        assert_contains!(
3069            &e,
3070            r#"Error during planning: Can not find compatible types to compare Boolean with [Struct("foo": Boolean), Utf8]"#
3071        );
3072
3073        Ok(())
3074    }
3075
3076    /// Return a `null` literal representing a struct type like: `{ a: bool }`
3077    fn struct_literal() -> Expr {
3078        let struct_literal = ScalarValue::try_from(DataType::Struct(
3079            vec![Field::new("foo", DataType::Boolean, false)].into(),
3080        ))
3081        .unwrap();
3082
3083        lit(struct_literal)
3084    }
3085
3086    #[tokio::test]
3087    async fn hash_agg_input_schema() -> Result<()> {
3088        let logical_plan = test_csv_scan_with_name("aggregate_test_100")
3089            .await?
3090            .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
3091            .build()?;
3092
3093        let execution_plan = plan(&logical_plan).await?;
3094        let final_hash_agg = execution_plan
3095            .as_any()
3096            .downcast_ref::<AggregateExec>()
3097            .expect("hash aggregate");
3098        assert_eq!(
3099            "sum(aggregate_test_100.c2)",
3100            final_hash_agg.schema().field(1).name()
3101        );
3102        // we need access to the input to the partial aggregate so that other projects can
3103        // implement serde
3104        assert_eq!("c2", final_hash_agg.input_schema().field(1).name());
3105
3106        Ok(())
3107    }
3108
3109    #[tokio::test]
3110    async fn hash_agg_grouping_set_input_schema() -> Result<()> {
3111        let grouping_set_expr = Expr::GroupingSet(GroupingSet::GroupingSets(vec![
3112            vec![col("c1")],
3113            vec![col("c2")],
3114            vec![col("c1"), col("c2")],
3115        ]));
3116        let logical_plan = test_csv_scan_with_name("aggregate_test_100")
3117            .await?
3118            .aggregate(vec![grouping_set_expr], vec![sum(col("c3"))])?
3119            .build()?;
3120
3121        let execution_plan = plan(&logical_plan).await?;
3122        let final_hash_agg = execution_plan
3123            .as_any()
3124            .downcast_ref::<AggregateExec>()
3125            .expect("hash aggregate");
3126        assert_eq!(
3127            "sum(aggregate_test_100.c3)",
3128            final_hash_agg.schema().field(3).name()
3129        );
3130        // we need access to the input to the partial aggregate so that other projects can
3131        // implement serde
3132        assert_eq!("c3", final_hash_agg.input_schema().field(2).name());
3133
3134        Ok(())
3135    }
3136
3137    #[tokio::test]
3138    async fn hash_agg_group_by_partitioned() -> Result<()> {
3139        let logical_plan = test_csv_scan()
3140            .await?
3141            .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
3142            .build()?;
3143
3144        let execution_plan = plan(&logical_plan).await?;
3145        let formatted = format!("{execution_plan:?}");
3146
3147        // Make sure the plan contains a FinalPartitioned, which means it will not use the Final
3148        // mode in Aggregate (which is slower)
3149        assert!(formatted.contains("FinalPartitioned"));
3150
3151        Ok(())
3152    }
3153
3154    #[tokio::test]
3155    async fn hash_agg_group_by_partitioned_on_dicts() -> Result<()> {
3156        let dict_array: DictionaryArray<Int32Type> =
3157            vec!["A", "B", "A", "A", "C", "A"].into_iter().collect();
3158        let val_array: Int32Array = vec![1, 2, 2, 4, 1, 1].into();
3159
3160        let batch = RecordBatch::try_from_iter(vec![
3161            ("d1", Arc::new(dict_array) as ArrayRef),
3162            ("d2", Arc::new(val_array) as ArrayRef),
3163        ])
3164        .unwrap();
3165
3166        let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
3167        let ctx = SessionContext::new();
3168
3169        let logical_plan = LogicalPlanBuilder::from(
3170            ctx.read_table(Arc::new(table))?.into_optimized_plan()?,
3171        )
3172        .aggregate(vec![col("d1")], vec![sum(col("d2"))])?
3173        .build()?;
3174
3175        let execution_plan = plan(&logical_plan).await?;
3176        let formatted = format!("{execution_plan:?}");
3177
3178        // Make sure the plan contains a FinalPartitioned, which means it will not use the Final
3179        // mode in Aggregate (which is slower)
3180        assert!(formatted.contains("FinalPartitioned"));
3181        Ok(())
3182    }
3183
3184    #[tokio::test]
3185    async fn hash_agg_grouping_set_by_partitioned() -> Result<()> {
3186        let grouping_set_expr = Expr::GroupingSet(GroupingSet::GroupingSets(vec![
3187            vec![col("c1")],
3188            vec![col("c2")],
3189            vec![col("c1"), col("c2")],
3190        ]));
3191        let logical_plan = test_csv_scan()
3192            .await?
3193            .aggregate(vec![grouping_set_expr], vec![sum(col("c3"))])?
3194            .build()?;
3195
3196        let execution_plan = plan(&logical_plan).await?;
3197        let formatted = format!("{execution_plan:?}");
3198
3199        // Make sure the plan contains a FinalPartitioned, which means it will not use the Final
3200        // mode in Aggregate (which is slower)
3201        assert!(formatted.contains("FinalPartitioned"));
3202
3203        Ok(())
3204    }
3205
3206    #[tokio::test]
3207    async fn aggregate_with_alias() -> Result<()> {
3208        let schema = Arc::new(Schema::new(vec![
3209            Field::new("c1", DataType::Utf8, false),
3210            Field::new("c2", DataType::UInt32, false),
3211        ]));
3212
3213        let logical_plan = scan_empty(None, schema.as_ref(), None)?
3214            .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
3215            .project(vec![col("c1"), sum(col("c2")).alias("total_salary")])?
3216            .build()?;
3217
3218        let physical_plan = plan(&logical_plan).await?;
3219        assert_eq!("c1", physical_plan.schema().field(0).name().as_str());
3220        assert_eq!(
3221            "total_salary",
3222            physical_plan.schema().field(1).name().as_str()
3223        );
3224        Ok(())
3225    }
3226
3227    #[tokio::test]
3228    async fn test_aggregate_count_all_with_alias() -> Result<()> {
3229        let schema = Arc::new(Schema::new(vec![
3230            Field::new("c1", DataType::Utf8, false),
3231            Field::new("c2", DataType::UInt32, false),
3232        ]));
3233
3234        let logical_plan = scan_empty(None, schema.as_ref(), None)?
3235            .aggregate(Vec::<Expr>::new(), vec![count_all().alias("total_rows")])?
3236            .build()?;
3237
3238        let physical_plan = plan(&logical_plan).await?;
3239        assert_eq!(
3240            "total_rows",
3241            physical_plan.schema().field(0).name().as_str()
3242        );
3243        Ok(())
3244    }
3245
3246    #[tokio::test]
3247    async fn test_explain() {
3248        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
3249
3250        let logical_plan = scan_empty(Some("employee"), &schema, None)
3251            .unwrap()
3252            .explain(true, false)
3253            .unwrap()
3254            .build()
3255            .unwrap();
3256
3257        let plan = plan(&logical_plan).await.unwrap();
3258        if let Some(plan) = plan.as_any().downcast_ref::<ExplainExec>() {
3259            let stringified_plans = plan.stringified_plans();
3260            assert!(stringified_plans.len() >= 4);
3261            assert!(stringified_plans
3262                .iter()
3263                .any(|p| matches!(p.plan_type, PlanType::FinalLogicalPlan)));
3264            assert!(stringified_plans
3265                .iter()
3266                .any(|p| matches!(p.plan_type, PlanType::InitialPhysicalPlan)));
3267            assert!(stringified_plans
3268                .iter()
3269                .any(|p| matches!(p.plan_type, PlanType::OptimizedPhysicalPlan { .. })));
3270            assert!(stringified_plans
3271                .iter()
3272                .any(|p| matches!(p.plan_type, PlanType::FinalPhysicalPlan)));
3273        } else {
3274            panic!(
3275                "Plan was not an explain plan: {}",
3276                displayable(plan.as_ref()).indent(true)
3277            );
3278        }
3279    }
3280
3281    #[tokio::test]
3282    async fn test_explain_indent_err() {
3283        let planner = DefaultPhysicalPlanner::default();
3284        let ctx = SessionContext::new();
3285        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
3286        let plan = Arc::new(
3287            scan_empty(Some("employee"), &schema, None)
3288                .unwrap()
3289                .explain(true, false)
3290                .unwrap()
3291                .build()
3292                .unwrap(),
3293        );
3294
3295        // Create a schema
3296        let schema = Arc::new(Schema::new(vec![
3297            Field::new("plan_type", DataType::Utf8, false),
3298            Field::new("plan", DataType::Utf8, false),
3299        ]));
3300
3301        // Create invalid indentation in the plan
3302        let stringified_plans =
3303            vec![StringifiedPlan::new(PlanType::FinalLogicalPlan, "Test Err")];
3304
3305        let explain = Explain {
3306            verbose: false,
3307            explain_format: ExplainFormat::Indent,
3308            plan,
3309            stringified_plans,
3310            schema: schema.to_dfschema_ref().unwrap(),
3311            logical_optimization_succeeded: false,
3312        };
3313        let plan = planner
3314            .handle_explain(&explain, &ctx.state())
3315            .await
3316            .unwrap();
3317        if let Some(plan) = plan.as_any().downcast_ref::<ExplainExec>() {
3318            let stringified_plans = plan.stringified_plans();
3319            assert_eq!(stringified_plans.len(), 1);
3320            assert_eq!(stringified_plans[0].plan.as_str(), "Test Err");
3321        } else {
3322            panic!(
3323                "Plan was not an explain plan: {}",
3324                displayable(plan.as_ref()).indent(true)
3325            );
3326        }
3327    }
3328
3329    struct ErrorExtensionPlanner {}
3330
3331    #[async_trait]
3332    impl ExtensionPlanner for ErrorExtensionPlanner {
3333        /// Create a physical plan for an extension node
3334        async fn plan_extension(
3335            &self,
3336            _planner: &dyn PhysicalPlanner,
3337            _node: &dyn UserDefinedLogicalNode,
3338            _logical_inputs: &[&LogicalPlan],
3339            _physical_inputs: &[Arc<dyn ExecutionPlan>],
3340            _session_state: &SessionState,
3341        ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
3342            internal_err!("BOOM")
3343        }
3344    }
3345    /// An example extension node that doesn't do anything
3346    #[derive(PartialEq, Eq, Hash)]
3347    struct NoOpExtensionNode {
3348        schema: DFSchemaRef,
3349    }
3350
3351    impl Default for NoOpExtensionNode {
3352        fn default() -> Self {
3353            Self {
3354                schema: DFSchemaRef::new(
3355                    DFSchema::from_unqualified_fields(
3356                        vec![Field::new("a", DataType::Int32, false)].into(),
3357                        HashMap::new(),
3358                    )
3359                    .unwrap(),
3360                ),
3361            }
3362        }
3363    }
3364
3365    impl Debug for NoOpExtensionNode {
3366        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3367            write!(f, "NoOp")
3368        }
3369    }
3370
3371    // Implementation needed for `UserDefinedLogicalNodeCore`, since the only field is
3372    // a schema, we can't derive `PartialOrd`, and we can't compare these.
3373    impl PartialOrd for NoOpExtensionNode {
3374        fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3375            None
3376        }
3377    }
3378
3379    impl UserDefinedLogicalNodeCore for NoOpExtensionNode {
3380        fn name(&self) -> &str {
3381            "NoOp"
3382        }
3383
3384        fn inputs(&self) -> Vec<&LogicalPlan> {
3385            vec![]
3386        }
3387
3388        fn schema(&self) -> &DFSchemaRef {
3389            &self.schema
3390        }
3391
3392        fn expressions(&self) -> Vec<Expr> {
3393            vec![]
3394        }
3395
3396        fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
3397            write!(f, "NoOp")
3398        }
3399
3400        fn with_exprs_and_inputs(
3401            &self,
3402            _exprs: Vec<Expr>,
3403            _inputs: Vec<LogicalPlan>,
3404        ) -> Result<Self> {
3405            unimplemented!("NoOp");
3406        }
3407
3408        fn supports_limit_pushdown(&self) -> bool {
3409            false // Disallow limit push-down by default
3410        }
3411    }
3412
3413    #[derive(Debug)]
3414    struct NoOpExecutionPlan {
3415        cache: PlanProperties,
3416    }
3417
3418    impl NoOpExecutionPlan {
3419        fn new(schema: SchemaRef) -> Self {
3420            let cache = Self::compute_properties(schema);
3421            Self { cache }
3422        }
3423
3424        /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
3425        fn compute_properties(schema: SchemaRef) -> PlanProperties {
3426            PlanProperties::new(
3427                EquivalenceProperties::new(schema),
3428                Partitioning::UnknownPartitioning(1),
3429                EmissionType::Incremental,
3430                Boundedness::Bounded,
3431            )
3432        }
3433    }
3434
3435    impl DisplayAs for NoOpExecutionPlan {
3436        fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
3437            match t {
3438                DisplayFormatType::Default | DisplayFormatType::Verbose => {
3439                    write!(f, "NoOpExecutionPlan")
3440                }
3441                DisplayFormatType::TreeRender => {
3442                    // TODO: collect info
3443                    write!(f, "")
3444                }
3445            }
3446        }
3447    }
3448
3449    impl ExecutionPlan for NoOpExecutionPlan {
3450        fn name(&self) -> &'static str {
3451            "NoOpExecutionPlan"
3452        }
3453
3454        /// Return a reference to Any that can be used for downcasting
3455        fn as_any(&self) -> &dyn Any {
3456            self
3457        }
3458
3459        fn properties(&self) -> &PlanProperties {
3460            &self.cache
3461        }
3462
3463        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3464            vec![]
3465        }
3466
3467        fn with_new_children(
3468            self: Arc<Self>,
3469            _children: Vec<Arc<dyn ExecutionPlan>>,
3470        ) -> Result<Arc<dyn ExecutionPlan>> {
3471            unimplemented!("NoOpExecutionPlan::with_new_children");
3472        }
3473
3474        fn execute(
3475            &self,
3476            _partition: usize,
3477            _context: Arc<TaskContext>,
3478        ) -> Result<SendableRecordBatchStream> {
3479            unimplemented!("NoOpExecutionPlan::execute");
3480        }
3481    }
3482
3483    //  Produces an execution plan where the schema is mismatched from
3484    //  the logical plan node.
3485    struct BadExtensionPlanner {}
3486
3487    #[async_trait]
3488    impl ExtensionPlanner for BadExtensionPlanner {
3489        /// Create a physical plan for an extension node
3490        async fn plan_extension(
3491            &self,
3492            _planner: &dyn PhysicalPlanner,
3493            _node: &dyn UserDefinedLogicalNode,
3494            _logical_inputs: &[&LogicalPlan],
3495            _physical_inputs: &[Arc<dyn ExecutionPlan>],
3496            _session_state: &SessionState,
3497        ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
3498            Ok(Some(Arc::new(NoOpExecutionPlan::new(SchemaRef::new(
3499                Schema::new(vec![Field::new("b", DataType::Int32, false)]),
3500            )))))
3501        }
3502    }
3503
3504    async fn test_csv_scan_with_name(name: &str) -> Result<LogicalPlanBuilder> {
3505        let ctx = SessionContext::new();
3506        let testdata = crate::test_util::arrow_test_data();
3507        let path = format!("{testdata}/csv/aggregate_test_100.csv");
3508        let options = CsvReadOptions::new().schema_infer_max_records(100);
3509        let logical_plan =
3510            match ctx.read_csv(path, options).await?.into_optimized_plan()? {
3511                LogicalPlan::TableScan(ref scan) => {
3512                    let mut scan = scan.clone();
3513                    let table_reference = TableReference::from(name);
3514                    scan.table_name = table_reference;
3515                    let new_schema = scan
3516                        .projected_schema
3517                        .as_ref()
3518                        .clone()
3519                        .replace_qualifier(name.to_string());
3520                    scan.projected_schema = Arc::new(new_schema);
3521                    LogicalPlan::TableScan(scan)
3522                }
3523                _ => unimplemented!(),
3524            };
3525        Ok(LogicalPlanBuilder::from(logical_plan))
3526    }
3527
3528    async fn test_csv_scan() -> Result<LogicalPlanBuilder> {
3529        let ctx = SessionContext::new();
3530        let testdata = crate::test_util::arrow_test_data();
3531        let path = format!("{testdata}/csv/aggregate_test_100.csv");
3532        let options = CsvReadOptions::new().schema_infer_max_records(100);
3533        Ok(LogicalPlanBuilder::from(
3534            ctx.read_csv(path, options).await?.into_optimized_plan()?,
3535        ))
3536    }
3537
3538    #[tokio::test]
3539    async fn test_display_plan_in_graphviz_format() {
3540        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
3541
3542        let logical_plan = scan_empty(Some("employee"), &schema, None)
3543            .unwrap()
3544            .project(vec![col("id") + lit(2)])
3545            .unwrap()
3546            .build()
3547            .unwrap();
3548
3549        let plan = plan(&logical_plan).await.unwrap();
3550
3551        let expected_graph = r#"
3552// Begin DataFusion GraphViz Plan,
3553// display it online here: https://dreampuf.github.io/GraphvizOnline
3554
3555digraph {
3556    1[shape=box label="ProjectionExec: expr=[id@0 + 2 as employee.id + Int32(2)]", tooltip=""]
3557    2[shape=box label="EmptyExec", tooltip=""]
3558    1 -> 2 [arrowhead=none, arrowtail=normal, dir=back]
3559}
3560// End DataFusion GraphViz Plan
3561"#;
3562
3563        let generated_graph = format!("{}", displayable(&*plan).graphviz());
3564
3565        assert_eq!(expected_graph, generated_graph);
3566    }
3567
3568    #[tokio::test]
3569    async fn test_display_graphviz_with_statistics() {
3570        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
3571
3572        let logical_plan = scan_empty(Some("employee"), &schema, None)
3573            .unwrap()
3574            .project(vec![col("id") + lit(2)])
3575            .unwrap()
3576            .build()
3577            .unwrap();
3578
3579        let plan = plan(&logical_plan).await.unwrap();
3580
3581        let expected_tooltip = ", tooltip=\"statistics=[";
3582
3583        let generated_graph = format!(
3584            "{}",
3585            displayable(&*plan).set_show_statistics(true).graphviz()
3586        );
3587
3588        assert_contains!(generated_graph, expected_tooltip);
3589    }
3590
3591    /// Extension Node which passes invariant checks
3592    #[derive(Debug)]
3593    struct OkExtensionNode(Vec<Arc<dyn ExecutionPlan>>);
3594    impl ExecutionPlan for OkExtensionNode {
3595        fn name(&self) -> &str {
3596            "always ok"
3597        }
3598        fn with_new_children(
3599            self: Arc<Self>,
3600            children: Vec<Arc<dyn ExecutionPlan>>,
3601        ) -> Result<Arc<dyn ExecutionPlan>> {
3602            Ok(Arc::new(Self(children)))
3603        }
3604        fn schema(&self) -> SchemaRef {
3605            Arc::new(Schema::empty())
3606        }
3607        fn as_any(&self) -> &dyn Any {
3608            unimplemented!()
3609        }
3610        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3611            self.0.iter().collect::<Vec<_>>()
3612        }
3613        fn properties(&self) -> &PlanProperties {
3614            unimplemented!()
3615        }
3616        fn execute(
3617            &self,
3618            _partition: usize,
3619            _context: Arc<TaskContext>,
3620        ) -> Result<SendableRecordBatchStream> {
3621            unimplemented!()
3622        }
3623    }
3624    impl DisplayAs for OkExtensionNode {
3625        fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
3626            write!(f, "{}", self.name())
3627        }
3628    }
3629
3630    /// Extension Node which fails the [`OptimizationInvariantChecker`].
3631    #[derive(Debug)]
3632    struct InvariantFailsExtensionNode;
3633    impl ExecutionPlan for InvariantFailsExtensionNode {
3634        fn name(&self) -> &str {
3635            "InvariantFailsExtensionNode"
3636        }
3637        fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
3638            match check {
3639                InvariantLevel::Always => plan_err!("extension node failed it's user-defined always-invariant check"),
3640                InvariantLevel::Executable => panic!("the OptimizationInvariantChecker should not be checking for executableness"),
3641            }
3642        }
3643        fn schema(&self) -> SchemaRef {
3644            Arc::new(Schema::empty())
3645        }
3646        fn with_new_children(
3647            self: Arc<Self>,
3648            _children: Vec<Arc<dyn ExecutionPlan>>,
3649        ) -> Result<Arc<dyn ExecutionPlan>> {
3650            unimplemented!()
3651        }
3652        fn as_any(&self) -> &dyn Any {
3653            unimplemented!()
3654        }
3655        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3656            unimplemented!()
3657        }
3658        fn properties(&self) -> &PlanProperties {
3659            unimplemented!()
3660        }
3661        fn execute(
3662            &self,
3663            _partition: usize,
3664            _context: Arc<TaskContext>,
3665        ) -> Result<SendableRecordBatchStream> {
3666            unimplemented!()
3667        }
3668    }
3669    impl DisplayAs for InvariantFailsExtensionNode {
3670        fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
3671            write!(f, "{}", self.name())
3672        }
3673    }
3674
3675    /// Extension Optimizer rule that requires the schema check
3676    #[derive(Debug)]
3677    struct OptimizerRuleWithSchemaCheck;
3678    impl PhysicalOptimizerRule for OptimizerRuleWithSchemaCheck {
3679        fn optimize(
3680            &self,
3681            plan: Arc<dyn ExecutionPlan>,
3682            _config: &ConfigOptions,
3683        ) -> Result<Arc<dyn ExecutionPlan>> {
3684            Ok(plan)
3685        }
3686        fn name(&self) -> &str {
3687            "OptimizerRuleWithSchemaCheck"
3688        }
3689        fn schema_check(&self) -> bool {
3690            true
3691        }
3692    }
3693
3694    #[test]
3695    fn test_optimization_invariant_checker() -> Result<()> {
3696        let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
3697            Arc::new(OptimizerRuleWithSchemaCheck);
3698
3699        // ok plan
3700        let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode(vec![]));
3701        let child = Arc::clone(&ok_node);
3702        let ok_plan = Arc::clone(&ok_node).with_new_children(vec![
3703            Arc::clone(&child).with_new_children(vec![Arc::clone(&child)])?,
3704            Arc::clone(&child),
3705        ])?;
3706
3707        // Test: check should pass with same schema
3708        let equal_schema = ok_plan.schema();
3709        OptimizationInvariantChecker::new(&rule).check(&ok_plan, equal_schema)?;
3710
3711        // Test: should fail with schema changed
3712        let different_schema =
3713            Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
3714        let expected_err = OptimizationInvariantChecker::new(&rule)
3715            .check(&ok_plan, different_schema)
3716            .unwrap_err();
3717        assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed. Schema mismatch. Expected original schema"));
3718
3719        // Test: should fail when extension node fails it's own invariant check
3720        let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
3721        let expected_err = OptimizationInvariantChecker::new(&rule)
3722            .check(&failing_node, ok_plan.schema())
3723            .unwrap_err();
3724        assert!(expected_err
3725            .to_string()
3726            .contains("extension node failed it's user-defined always-invariant check"));
3727
3728        // Test: should fail when descendent extension node fails
3729        let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
3730        let invalid_plan = ok_node.with_new_children(vec![
3731            Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
3732            Arc::clone(&child),
3733        ])?;
3734        let expected_err = OptimizationInvariantChecker::new(&rule)
3735            .check(&invalid_plan, ok_plan.schema())
3736            .unwrap_err();
3737        assert!(expected_err
3738            .to_string()
3739            .contains("extension node failed it's user-defined always-invariant check"));
3740
3741        Ok(())
3742    }
3743
3744    /// Extension Node which fails the [`InvariantChecker`]
3745    /// if, and only if, [`InvariantLevel::Executable`]
3746    #[derive(Debug)]
3747    struct ExecutableInvariantFails;
3748    impl ExecutionPlan for ExecutableInvariantFails {
3749        fn name(&self) -> &str {
3750            "ExecutableInvariantFails"
3751        }
3752        fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
3753            match check {
3754                InvariantLevel::Always => Ok(()),
3755                InvariantLevel::Executable => plan_err!(
3756                    "extension node failed it's user-defined executable-invariant check"
3757                ),
3758            }
3759        }
3760        fn schema(&self) -> SchemaRef {
3761            Arc::new(Schema::empty())
3762        }
3763        fn with_new_children(
3764            self: Arc<Self>,
3765            _children: Vec<Arc<dyn ExecutionPlan>>,
3766        ) -> Result<Arc<dyn ExecutionPlan>> {
3767            unimplemented!()
3768        }
3769        fn as_any(&self) -> &dyn Any {
3770            unimplemented!()
3771        }
3772        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3773            vec![]
3774        }
3775        fn properties(&self) -> &PlanProperties {
3776            unimplemented!()
3777        }
3778        fn execute(
3779            &self,
3780            _partition: usize,
3781            _context: Arc<TaskContext>,
3782        ) -> Result<SendableRecordBatchStream> {
3783            unimplemented!()
3784        }
3785    }
3786    impl DisplayAs for ExecutableInvariantFails {
3787        fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
3788            write!(f, "{}", self.name())
3789        }
3790    }
3791
3792    #[test]
3793    fn test_invariant_checker_levels() -> Result<()> {
3794        // plan that passes the always-invariant, but fails the executable check
3795        let plan: Arc<dyn ExecutionPlan> = Arc::new(ExecutableInvariantFails);
3796
3797        // Test: check should pass with less stringent Always check
3798        InvariantChecker(InvariantLevel::Always).check(&plan)?;
3799
3800        // Test: should fail the executable check
3801        let expected_err = InvariantChecker(InvariantLevel::Executable)
3802            .check(&plan)
3803            .unwrap_err();
3804        assert!(expected_err.to_string().contains(
3805            "extension node failed it's user-defined executable-invariant check"
3806        ));
3807
3808        // Test: should fail when descendent extension node fails
3809        let failing_node: Arc<dyn ExecutionPlan> = Arc::new(ExecutableInvariantFails);
3810        let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode(vec![]));
3811        let child = Arc::clone(&ok_node);
3812        let plan = ok_node.with_new_children(vec![
3813            Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
3814            Arc::clone(&child),
3815        ])?;
3816        let expected_err = InvariantChecker(InvariantLevel::Executable)
3817            .check(&plan)
3818            .unwrap_err();
3819        assert!(expected_err.to_string().contains(
3820            "extension node failed it's user-defined executable-invariant check"
3821        ));
3822
3823        Ok(())
3824    }
3825
3826    // Reproducer for DataFusion issue #17405:
3827    //
3828    // The following SQL is semantically invalid. Notably, the `SELECT left_table.a, right_table.a`
3829    // clause is missing from the explicit logical plan:
3830    //
3831    // SELECT a FROM (
3832    //       -- SELECT left_table.a, right_table.a
3833    //       FROM left_table
3834    //       FULL JOIN right_table ON left_table.a = right_table.a
3835    // ) AS alias
3836    // GROUP BY a;
3837    //
3838    // As a result, the variables within `alias` subquery are not properly distinguished, which
3839    // leads to a bug for logical and physical planning.
3840    //
3841    // The fix is to implicitly insert a Projection node to represent the missing SELECT clause to
3842    // ensure each field is correctly aliased to a unique name when the SubqueryAlias node is added.
3843    #[tokio::test]
3844    async fn subquery_alias_confusing_the_optimizer() -> Result<()> {
3845        let state = make_session_state();
3846
3847        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3848        let schema = Arc::new(schema);
3849
3850        let table = MemTable::try_new(schema.clone(), vec![vec![]])?;
3851        let table = Arc::new(table);
3852
3853        let source = DefaultTableSource::new(table);
3854        let source = Arc::new(source);
3855
3856        let left = LogicalPlanBuilder::scan("left", source.clone(), None)?;
3857        let right = LogicalPlanBuilder::scan("right", source, None)?.build()?;
3858
3859        let join_keys = (
3860            vec![datafusion_common::Column::new(Some("left"), "a")],
3861            vec![datafusion_common::Column::new(Some("right"), "a")],
3862        );
3863
3864        let join = left.join(right, JoinType::Full, join_keys, None)?.build()?;
3865
3866        let alias = subquery_alias(join, "alias")?;
3867
3868        let planner = DefaultPhysicalPlanner::default();
3869
3870        let logical_plan = LogicalPlanBuilder::new(alias)
3871            .aggregate(vec![col("a:1")], Vec::<Expr>::new())?
3872            .build()?;
3873        let _physical_plan = planner.create_physical_plan(&logical_plan, &state).await?;
3874
3875        let optimized_logical_plan = state.optimize(&logical_plan)?;
3876        let _optimized_physical_plan = planner
3877            .create_physical_plan(&optimized_logical_plan, &state)
3878            .await?;
3879
3880        Ok(())
3881    }
3882}