datafusion_physical_plan/
projection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the projection execution plan. A projection determines which columns or expressions
19//! are returned from a query. The SQL statement `SELECT a, b, a+b FROM t1` is an example
20//! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` are the
21//! projection expressions. `SELECT` without `FROM` will only evaluate expressions.
22
23use super::expressions::{Column, Literal};
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::{
26    DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
27    SendableRecordBatchStream, Statistics,
28};
29use crate::execution_plan::CardinalityEffect;
30use crate::filter_pushdown::{
31    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
32    FilterPushdownPropagation,
33};
34use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
35use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr};
36use std::any::Any;
37use std::collections::HashMap;
38use std::pin::Pin;
39use std::sync::Arc;
40use std::task::{Context, Poll};
41
42use arrow::datatypes::SchemaRef;
43use arrow::record_batch::{RecordBatch, RecordBatchOptions};
44use datafusion_common::config::ConfigOptions;
45use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNodeRecursion};
46use datafusion_common::{internal_err, JoinSide, Result};
47use datafusion_execution::TaskContext;
48use datafusion_physical_expr::equivalence::ProjectionMapping;
49use datafusion_physical_expr::utils::collect_columns;
50use datafusion_physical_expr::{PhysicalExprExt, PhysicalExprRef};
51use datafusion_physical_expr_common::physical_expr::fmt_sql;
52use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
53// Re-exported from datafusion-physical-expr for backwards compatibility
54// We recommend updating your imports to use datafusion-physical-expr directly
55pub use datafusion_physical_expr::projection::{
56    update_expr, ProjectionExpr, ProjectionExprs,
57};
58
59use futures::stream::{Stream, StreamExt};
60use log::trace;
61
62/// [`ExecutionPlan`] for a projection
63///
64/// Computes a set of scalar value expressions for each input row, producing one
65/// output row for each input row.
66#[derive(Debug, Clone)]
67pub struct ProjectionExec {
68    /// The projection expressions stored as tuples of (expression, output column name)
69    projection: ProjectionExprs,
70    /// The schema once the projection has been applied to the input
71    schema: SchemaRef,
72    /// The input plan
73    input: Arc<dyn ExecutionPlan>,
74    /// Execution metrics
75    metrics: ExecutionPlanMetricsSet,
76    /// Cache holding plan properties like equivalences, output partitioning etc.
77    cache: PlanProperties,
78}
79
80impl ProjectionExec {
81    /// Create a projection on an input
82    ///
83    /// # Example:
84    /// Create a `ProjectionExec` to crate `SELECT a, a+b AS sum_ab FROM t1`:
85    ///
86    /// ```
87    /// # use std::sync::Arc;
88    /// # use arrow_schema::{Schema, Field, DataType};
89    /// # use datafusion_expr::Operator;
90    /// # use datafusion_physical_plan::ExecutionPlan;
91    /// # use datafusion_physical_expr::expressions::{col, binary};
92    /// # use datafusion_physical_plan::empty::EmptyExec;
93    /// # use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
94    /// # fn schema() -> Arc<Schema> {
95    /// #  Arc::new(Schema::new(vec![
96    /// #   Field::new("a", DataType::Int32, false),
97    /// #   Field::new("b", DataType::Int32, false),
98    /// # ]))
99    /// # }
100    /// #
101    /// # fn input() -> Arc<dyn ExecutionPlan> {
102    /// #  Arc::new(EmptyExec::new(schema()))
103    /// # }
104    /// #
105    /// # fn main() {
106    /// let schema = schema();
107    /// // Create PhysicalExprs
108    /// let a = col("a", &schema).unwrap();
109    /// let b = col("b", &schema).unwrap();
110    /// let a_plus_b = binary(Arc::clone(&a), Operator::Plus, b, &schema).unwrap();
111    /// // create ProjectionExec
112    /// let proj = ProjectionExec::try_new(
113    ///     [
114    ///         ProjectionExpr {
115    ///             // expr a produces the column named "a"
116    ///             expr: a,
117    ///             alias: "a".to_string(),
118    ///         },
119    ///         ProjectionExpr {
120    ///             // expr: a + b produces the column named "sum_ab"
121    ///             expr: a_plus_b,
122    ///             alias: "sum_ab".to_string(),
123    ///         },
124    ///     ],
125    ///     input(),
126    /// )
127    /// .unwrap();
128    /// # }
129    /// ```
130    pub fn try_new<I, E>(expr: I, input: Arc<dyn ExecutionPlan>) -> Result<Self>
131    where
132        I: IntoIterator<Item = E>,
133        E: Into<ProjectionExpr>,
134    {
135        let input_schema = input.schema();
136        // convert argument to Vec<ProjectionExpr>
137        let expr_vec = expr.into_iter().map(Into::into).collect::<Vec<_>>();
138        let projection = ProjectionExprs::new(expr_vec);
139
140        let schema = Arc::new(projection.project_schema(&input_schema)?);
141
142        // Construct a map from the input expressions to the output expression of the Projection
143        let projection_mapping = projection.projection_mapping(&input_schema)?;
144        let cache =
145            Self::compute_properties(&input, &projection_mapping, Arc::clone(&schema))?;
146        Ok(Self {
147            projection,
148            schema,
149            input,
150            metrics: ExecutionPlanMetricsSet::new(),
151            cache,
152        })
153    }
154
155    /// The projection expressions stored as tuples of (expression, output column name)
156    pub fn expr(&self) -> &[ProjectionExpr] {
157        self.projection.as_ref()
158    }
159
160    /// The input plan
161    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
162        &self.input
163    }
164
165    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
166    fn compute_properties(
167        input: &Arc<dyn ExecutionPlan>,
168        projection_mapping: &ProjectionMapping,
169        schema: SchemaRef,
170    ) -> Result<PlanProperties> {
171        // Calculate equivalence properties:
172        let input_eq_properties = input.equivalence_properties();
173        let eq_properties = input_eq_properties.project(projection_mapping, schema);
174        // Calculate output partitioning, which needs to respect aliases:
175        let output_partitioning = input
176            .output_partitioning()
177            .project(projection_mapping, input_eq_properties);
178
179        Ok(PlanProperties::new(
180            eq_properties,
181            output_partitioning,
182            input.pipeline_behavior(),
183            input.boundedness(),
184        ))
185    }
186}
187
188impl DisplayAs for ProjectionExec {
189    fn fmt_as(
190        &self,
191        t: DisplayFormatType,
192        f: &mut std::fmt::Formatter,
193    ) -> std::fmt::Result {
194        match t {
195            DisplayFormatType::Default | DisplayFormatType::Verbose => {
196                let expr: Vec<String> = self
197                    .projection
198                    .as_ref()
199                    .iter()
200                    .map(|proj_expr| {
201                        let e = proj_expr.expr.to_string();
202                        if e != proj_expr.alias {
203                            format!("{e} as {}", proj_expr.alias)
204                        } else {
205                            e
206                        }
207                    })
208                    .collect();
209
210                write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
211            }
212            DisplayFormatType::TreeRender => {
213                for (i, proj_expr) in self.expr().iter().enumerate() {
214                    let expr_sql = fmt_sql(proj_expr.expr.as_ref());
215                    if proj_expr.expr.to_string() == proj_expr.alias {
216                        writeln!(f, "expr{i}={expr_sql}")?;
217                    } else {
218                        writeln!(f, "{}={expr_sql}", proj_expr.alias)?;
219                    }
220                }
221
222                Ok(())
223            }
224        }
225    }
226}
227
228impl ExecutionPlan for ProjectionExec {
229    fn name(&self) -> &'static str {
230        "ProjectionExec"
231    }
232
233    /// Return a reference to Any that can be used for downcasting
234    fn as_any(&self) -> &dyn Any {
235        self
236    }
237
238    fn properties(&self) -> &PlanProperties {
239        &self.cache
240    }
241
242    fn maintains_input_order(&self) -> Vec<bool> {
243        // Tell optimizer this operator doesn't reorder its input
244        vec![true]
245    }
246
247    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
248        let all_simple_exprs = self.projection.iter().all(|proj_expr| {
249            proj_expr.expr.as_any().is::<Column>()
250                || proj_expr.expr.as_any().is::<Literal>()
251        });
252        // If expressions are all either column_expr or Literal, then all computations in this projection are reorder or rename,
253        // and projection would not benefit from the repartition, benefits_from_input_partitioning will return false.
254        vec![!all_simple_exprs]
255    }
256
257    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
258        vec![&self.input]
259    }
260
261    fn with_new_children(
262        self: Arc<Self>,
263        mut children: Vec<Arc<dyn ExecutionPlan>>,
264    ) -> Result<Arc<dyn ExecutionPlan>> {
265        ProjectionExec::try_new(self.projection.clone(), children.swap_remove(0))
266            .map(|p| Arc::new(p) as _)
267    }
268
269    fn execute(
270        &self,
271        partition: usize,
272        context: Arc<TaskContext>,
273    ) -> Result<SendableRecordBatchStream> {
274        trace!("Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
275        Ok(Box::pin(ProjectionStream::new(
276            Arc::clone(&self.schema),
277            self.projection.expr_iter().collect(),
278            self.input.execute(partition, context)?,
279            BaselineMetrics::new(&self.metrics, partition),
280        )))
281    }
282
283    fn metrics(&self) -> Option<MetricsSet> {
284        Some(self.metrics.clone_inner())
285    }
286
287    fn statistics(&self) -> Result<Statistics> {
288        self.partition_statistics(None)
289    }
290
291    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
292        let input_stats = self.input.partition_statistics(partition)?;
293        self.projection
294            .project_statistics(input_stats, &self.input.schema())
295    }
296
297    fn supports_limit_pushdown(&self) -> bool {
298        true
299    }
300
301    fn cardinality_effect(&self) -> CardinalityEffect {
302        CardinalityEffect::Equal
303    }
304
305    fn try_swapping_with_projection(
306        &self,
307        projection: &ProjectionExec,
308    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
309        let maybe_unified = try_unifying_projections(projection, self)?;
310        if let Some(new_plan) = maybe_unified {
311            // To unify 3 or more sequential projections:
312            remove_unnecessary_projections(new_plan).data().map(Some)
313        } else {
314            Ok(Some(Arc::new(projection.clone())))
315        }
316    }
317
318    fn gather_filters_for_pushdown(
319        &self,
320        _phase: FilterPushdownPhase,
321        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
322        _config: &ConfigOptions,
323    ) -> Result<FilterDescription> {
324        // TODO: In future, we can try to handle inverting aliases here.
325        // For the time being, we pass through untransformed filters, so filters on aliases are not handled.
326        // https://github.com/apache/datafusion/issues/17246
327        FilterDescription::from_children(parent_filters, &self.children())
328    }
329
330    fn handle_child_pushdown_result(
331        &self,
332        _phase: FilterPushdownPhase,
333        child_pushdown_result: ChildPushdownResult,
334        _config: &ConfigOptions,
335    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
336        Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
337    }
338}
339
340impl ProjectionStream {
341    /// Create a new projection stream
342    fn new(
343        schema: SchemaRef,
344        expr: Vec<Arc<dyn PhysicalExpr>>,
345        input: SendableRecordBatchStream,
346        baseline_metrics: BaselineMetrics,
347    ) -> Self {
348        Self {
349            schema,
350            expr,
351            input,
352            baseline_metrics,
353        }
354    }
355
356    fn batch_project(&self, batch: &RecordBatch) -> Result<RecordBatch> {
357        // Records time on drop
358        let _timer = self.baseline_metrics.elapsed_compute().timer();
359        let arrays = self
360            .expr
361            .iter()
362            .map(|expr| {
363                expr.evaluate(batch)
364                    .and_then(|v| v.into_array(batch.num_rows()))
365            })
366            .collect::<Result<Vec<_>>>()?;
367
368        if arrays.is_empty() {
369            let options =
370                RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
371            RecordBatch::try_new_with_options(Arc::clone(&self.schema), arrays, &options)
372                .map_err(Into::into)
373        } else {
374            RecordBatch::try_new(Arc::clone(&self.schema), arrays).map_err(Into::into)
375        }
376    }
377}
378
379/// Projection iterator
380struct ProjectionStream {
381    schema: SchemaRef,
382    expr: Vec<Arc<dyn PhysicalExpr>>,
383    input: SendableRecordBatchStream,
384    baseline_metrics: BaselineMetrics,
385}
386
387impl Stream for ProjectionStream {
388    type Item = Result<RecordBatch>;
389
390    fn poll_next(
391        mut self: Pin<&mut Self>,
392        cx: &mut Context<'_>,
393    ) -> Poll<Option<Self::Item>> {
394        let poll = self.input.poll_next_unpin(cx).map(|x| match x {
395            Some(Ok(batch)) => Some(self.batch_project(&batch)),
396            other => other,
397        });
398
399        self.baseline_metrics.record_poll(poll)
400    }
401
402    fn size_hint(&self) -> (usize, Option<usize>) {
403        // Same number of record batches
404        self.input.size_hint()
405    }
406}
407
408impl RecordBatchStream for ProjectionStream {
409    /// Get the schema
410    fn schema(&self) -> SchemaRef {
411        Arc::clone(&self.schema)
412    }
413}
414
415pub trait EmbeddedProjection: ExecutionPlan + Sized {
416    fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>;
417}
418
419/// Some projection can't be pushed down left input or right input of hash join because filter or on need may need some columns that won't be used in later.
420/// By embed those projection to hash join, we can reduce the cost of build_batch_from_indices in hash join (build_batch_from_indices need to can compute::take() for each column) and avoid unnecessary output creation.
421pub fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
422    projection: &ProjectionExec,
423    execution_plan: &Exec,
424) -> Result<Option<Arc<dyn ExecutionPlan>>> {
425    // Collect all column indices from the given projection expressions.
426    let projection_index = collect_column_indices(projection.expr());
427
428    if projection_index.is_empty() {
429        return Ok(None);
430    };
431
432    // If the projection indices is the same as the input columns, we don't need to embed the projection to hash join.
433    // Check the projection_index is 0..n-1 and the length of projection_index is the same as the length of execution_plan schema fields.
434    if projection_index.len() == projection_index.last().unwrap() + 1
435        && projection_index.len() == execution_plan.schema().fields().len()
436    {
437        return Ok(None);
438    }
439
440    let new_execution_plan =
441        Arc::new(execution_plan.with_projection(Some(projection_index.to_vec()))?);
442
443    // Build projection expressions for update_expr. Zip the projection_index with the new_execution_plan output schema fields.
444    let embed_project_exprs = projection_index
445        .iter()
446        .zip(new_execution_plan.schema().fields())
447        .map(|(index, field)| ProjectionExpr {
448            expr: Arc::new(Column::new(field.name(), *index)) as Arc<dyn PhysicalExpr>,
449            alias: field.name().to_owned(),
450        })
451        .collect::<Vec<_>>();
452
453    let mut new_projection_exprs = Vec::with_capacity(projection.expr().len());
454
455    for proj_expr in projection.expr() {
456        // update column index for projection expression since the input schema has been changed.
457        let Some(expr) =
458            update_expr(&proj_expr.expr, embed_project_exprs.as_slice(), false)?
459        else {
460            return Ok(None);
461        };
462        new_projection_exprs.push(ProjectionExpr {
463            expr,
464            alias: proj_expr.alias.clone(),
465        });
466    }
467    // Old projection may contain some alias or expression such as `a + 1` and `CAST('true' AS BOOLEAN)`, but our projection_exprs in hash join just contain column, so we need to create the new projection to keep the original projection.
468    let new_projection = Arc::new(ProjectionExec::try_new(
469        new_projection_exprs,
470        Arc::clone(&new_execution_plan) as _,
471    )?);
472    if is_projection_removable(&new_projection) {
473        Ok(Some(new_execution_plan))
474    } else {
475        Ok(Some(new_projection))
476    }
477}
478
479pub struct JoinData {
480    pub projected_left_child: ProjectionExec,
481    pub projected_right_child: ProjectionExec,
482    pub join_filter: Option<JoinFilter>,
483    pub join_on: JoinOn,
484}
485
486pub fn try_pushdown_through_join(
487    projection: &ProjectionExec,
488    join_left: &Arc<dyn ExecutionPlan>,
489    join_right: &Arc<dyn ExecutionPlan>,
490    join_on: JoinOnRef,
491    schema: SchemaRef,
492    filter: Option<&JoinFilter>,
493) -> Result<Option<JoinData>> {
494    // Convert projected expressions to columns. We can not proceed if this is not possible.
495    let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
496        return Ok(None);
497    };
498
499    let (far_right_left_col_ind, far_left_right_col_ind) =
500        join_table_borders(join_left.schema().fields().len(), &projection_as_columns);
501
502    if !join_allows_pushdown(
503        &projection_as_columns,
504        &schema,
505        far_right_left_col_ind,
506        far_left_right_col_ind,
507    ) {
508        return Ok(None);
509    }
510
511    let new_filter = if let Some(filter) = filter {
512        match update_join_filter(
513            &projection_as_columns[0..=far_right_left_col_ind as _],
514            &projection_as_columns[far_left_right_col_ind as _..],
515            filter,
516            join_left.schema().fields().len(),
517        ) {
518            Some(updated_filter) => Some(updated_filter),
519            None => return Ok(None),
520        }
521    } else {
522        None
523    };
524
525    let Some(new_on) = update_join_on(
526        &projection_as_columns[0..=far_right_left_col_ind as _],
527        &projection_as_columns[far_left_right_col_ind as _..],
528        join_on,
529        join_left.schema().fields().len(),
530    ) else {
531        return Ok(None);
532    };
533
534    let (new_left, new_right) = new_join_children(
535        &projection_as_columns,
536        far_right_left_col_ind,
537        far_left_right_col_ind,
538        join_left,
539        join_right,
540    )?;
541
542    Ok(Some(JoinData {
543        projected_left_child: new_left,
544        projected_right_child: new_right,
545        join_filter: new_filter,
546        join_on: new_on,
547    }))
548}
549
550/// This function checks if `plan` is a [`ProjectionExec`], and inspects its
551/// input(s) to test whether it can push `plan` under its input(s). This function
552/// will operate on the entire tree and may ultimately remove `plan` entirely
553/// by leveraging source providers with built-in projection capabilities.
554pub fn remove_unnecessary_projections(
555    plan: Arc<dyn ExecutionPlan>,
556) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
557    let maybe_modified =
558        if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
559            // If the projection does not cause any change on the input, we can
560            // safely remove it:
561            if is_projection_removable(projection) {
562                return Ok(Transformed::yes(Arc::clone(projection.input())));
563            }
564            // If it does, check if we can push it under its child(ren):
565            projection
566                .input()
567                .try_swapping_with_projection(projection)?
568        } else {
569            return Ok(Transformed::no(plan));
570        };
571    Ok(maybe_modified.map_or_else(|| Transformed::no(plan), Transformed::yes))
572}
573
574/// Compare the inputs and outputs of the projection. All expressions must be
575/// columns without alias, and projection does not change the order of fields.
576/// For example, if the input schema is `a, b`, `SELECT a, b` is removable,
577/// but `SELECT b, a` and `SELECT a+1, b` and `SELECT a AS c, b` are not.
578fn is_projection_removable(projection: &ProjectionExec) -> bool {
579    let exprs = projection.expr();
580    exprs.iter().enumerate().all(|(idx, proj_expr)| {
581        let Some(col) = proj_expr.expr.as_any().downcast_ref::<Column>() else {
582            return false;
583        };
584        col.name() == proj_expr.alias && col.index() == idx
585    }) && exprs.len() == projection.input().schema().fields().len()
586}
587
588/// Given the expression set of a projection, checks if the projection causes
589/// any renaming or constructs a non-`Column` physical expression.
590pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> bool {
591    exprs.iter().all(|proj_expr| {
592        proj_expr
593            .expr
594            .as_any()
595            .downcast_ref::<Column>()
596            .map(|column| column.name() == proj_expr.alias)
597            .unwrap_or(false)
598    })
599}
600
601/// Updates a source provider's projected columns according to the given
602/// projection operator's expressions. To use this function safely, one must
603/// ensure that all expressions are `Column` expressions without aliases.
604pub fn new_projections_for_columns(
605    projection: &[ProjectionExpr],
606    source: &[usize],
607) -> Vec<usize> {
608    projection
609        .iter()
610        .filter_map(|proj_expr| {
611            proj_expr
612                .expr
613                .as_any()
614                .downcast_ref::<Column>()
615                .map(|expr| source[expr.index()])
616        })
617        .collect()
618}
619
620/// Creates a new [`ProjectionExec`] instance with the given child plan and
621/// projected expressions.
622pub fn make_with_child(
623    projection: &ProjectionExec,
624    child: &Arc<dyn ExecutionPlan>,
625) -> Result<Arc<dyn ExecutionPlan>> {
626    ProjectionExec::try_new(projection.expr().to_vec(), Arc::clone(child))
627        .map(|e| Arc::new(e) as _)
628}
629
630/// Returns `true` if all the expressions in the argument are `Column`s.
631pub fn all_columns(exprs: &[ProjectionExpr]) -> bool {
632    exprs
633        .iter()
634        .all(|proj_expr| proj_expr.expr.as_any().is::<Column>())
635}
636
637/// Updates the given lexicographic ordering according to given projected
638/// expressions using the [`update_expr`] function.
639pub fn update_ordering(
640    ordering: LexOrdering,
641    projected_exprs: &[ProjectionExpr],
642) -> Result<Option<LexOrdering>> {
643    let mut updated_exprs = vec![];
644    for mut sort_expr in ordering.into_iter() {
645        let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
646        else {
647            return Ok(None);
648        };
649        sort_expr.expr = updated_expr;
650        updated_exprs.push(sort_expr);
651    }
652    Ok(LexOrdering::new(updated_exprs))
653}
654
655/// Updates the given lexicographic requirement according to given projected
656/// expressions using the [`update_expr`] function.
657pub fn update_ordering_requirement(
658    reqs: LexRequirement,
659    projected_exprs: &[ProjectionExpr],
660) -> Result<Option<LexRequirement>> {
661    let mut updated_exprs = vec![];
662    for mut sort_expr in reqs.into_iter() {
663        let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
664        else {
665            return Ok(None);
666        };
667        sort_expr.expr = updated_expr;
668        updated_exprs.push(sort_expr);
669    }
670    Ok(LexRequirement::new(updated_exprs))
671}
672
673/// Downcasts all the expressions in `exprs` to `Column`s. If any of the given
674/// expressions is not a `Column`, returns `None`.
675pub fn physical_to_column_exprs(
676    exprs: &[ProjectionExpr],
677) -> Option<Vec<(Column, String)>> {
678    exprs
679        .iter()
680        .map(|proj_expr| {
681            proj_expr
682                .expr
683                .as_any()
684                .downcast_ref::<Column>()
685                .map(|col| (col.clone(), proj_expr.alias.clone()))
686        })
687        .collect()
688}
689
690/// If pushing down the projection over this join's children seems possible,
691/// this function constructs the new [`ProjectionExec`]s that will come on top
692/// of the original children of the join.
693pub fn new_join_children(
694    projection_as_columns: &[(Column, String)],
695    far_right_left_col_ind: i32,
696    far_left_right_col_ind: i32,
697    left_child: &Arc<dyn ExecutionPlan>,
698    right_child: &Arc<dyn ExecutionPlan>,
699) -> Result<(ProjectionExec, ProjectionExec)> {
700    let new_left = ProjectionExec::try_new(
701        projection_as_columns[0..=far_right_left_col_ind as _]
702            .iter()
703            .map(|(col, alias)| ProjectionExpr {
704                expr: Arc::new(Column::new(col.name(), col.index())) as _,
705                alias: alias.clone(),
706            }),
707        Arc::clone(left_child),
708    )?;
709    let left_size = left_child.schema().fields().len() as i32;
710    let new_right = ProjectionExec::try_new(
711        projection_as_columns[far_left_right_col_ind as _..]
712            .iter()
713            .map(|(col, alias)| {
714                ProjectionExpr {
715                    expr: Arc::new(Column::new(
716                        col.name(),
717                        // Align projected expressions coming from the right
718                        // table with the new right child projection:
719                        (col.index() as i32 - left_size) as _,
720                    )) as _,
721                    alias: alias.clone(),
722                }
723            }),
724        Arc::clone(right_child),
725    )?;
726
727    Ok((new_left, new_right))
728}
729
730/// Checks three conditions for pushing a projection down through a join:
731/// - Projection must narrow the join output schema.
732/// - Columns coming from left/right tables must be collected at the left/right
733///   sides of the output table.
734/// - Left or right table is not lost after the projection.
735pub fn join_allows_pushdown(
736    projection_as_columns: &[(Column, String)],
737    join_schema: &SchemaRef,
738    far_right_left_col_ind: i32,
739    far_left_right_col_ind: i32,
740) -> bool {
741    // Projection must narrow the join output:
742    projection_as_columns.len() < join_schema.fields().len()
743    // Are the columns from different tables mixed?
744    && (far_right_left_col_ind + 1 == far_left_right_col_ind)
745    // Left or right table is not lost after the projection.
746    && far_right_left_col_ind >= 0
747    && far_left_right_col_ind < projection_as_columns.len() as i32
748}
749
750/// Returns the last index before encountering a column coming from the right table when traveling
751/// through the projection from left to right, and the last index before encountering a column
752/// coming from the left table when traveling through the projection from right to left.
753/// If there is no column in the projection coming from the left side, it returns (-1, ...),
754/// if there is no column in the projection coming from the right side, it returns (..., projection length).
755pub fn join_table_borders(
756    left_table_column_count: usize,
757    projection_as_columns: &[(Column, String)],
758) -> (i32, i32) {
759    let far_right_left_col_ind = projection_as_columns
760        .iter()
761        .enumerate()
762        .take_while(|(_, (projection_column, _))| {
763            projection_column.index() < left_table_column_count
764        })
765        .last()
766        .map(|(index, _)| index as i32)
767        .unwrap_or(-1);
768
769    let far_left_right_col_ind = projection_as_columns
770        .iter()
771        .enumerate()
772        .rev()
773        .take_while(|(_, (projection_column, _))| {
774            projection_column.index() >= left_table_column_count
775        })
776        .last()
777        .map(|(index, _)| index as i32)
778        .unwrap_or(projection_as_columns.len() as i32);
779
780    (far_right_left_col_ind, far_left_right_col_ind)
781}
782
783/// Tries to update the equi-join `Column`'s of a join as if the input of
784/// the join was replaced by a projection.
785pub fn update_join_on(
786    proj_left_exprs: &[(Column, String)],
787    proj_right_exprs: &[(Column, String)],
788    hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)],
789    left_field_size: usize,
790) -> Option<Vec<(PhysicalExprRef, PhysicalExprRef)>> {
791    // TODO: Clippy wants the "map" call removed, but doing so generates
792    //       a compilation error. Remove the clippy directive once this
793    //       issue is fixed.
794    #[allow(clippy::map_identity)]
795    let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on
796        .iter()
797        .map(|(left, right)| (left, right))
798        .unzip();
799
800    let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0);
801    let new_right_columns =
802        new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);
803
804    match (new_left_columns, new_right_columns) {
805        (Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()),
806        _ => None,
807    }
808}
809
810/// Tries to update the column indices of a [`JoinFilter`] as if the input of
811/// the join was replaced by a projection.
812pub fn update_join_filter(
813    projection_left_exprs: &[(Column, String)],
814    projection_right_exprs: &[(Column, String)],
815    join_filter: &JoinFilter,
816    left_field_size: usize,
817) -> Option<JoinFilter> {
818    let mut new_left_indices = new_indices_for_join_filter(
819        join_filter,
820        JoinSide::Left,
821        projection_left_exprs,
822        0,
823    )
824    .into_iter();
825    let mut new_right_indices = new_indices_for_join_filter(
826        join_filter,
827        JoinSide::Right,
828        projection_right_exprs,
829        left_field_size,
830    )
831    .into_iter();
832
833    // Check if all columns match:
834    (new_right_indices.len() + new_left_indices.len()
835        == join_filter.column_indices().len())
836    .then(|| {
837        JoinFilter::new(
838            Arc::clone(join_filter.expression()),
839            join_filter
840                .column_indices()
841                .iter()
842                .map(|col_idx| ColumnIndex {
843                    index: if col_idx.side == JoinSide::Left {
844                        new_left_indices.next().unwrap()
845                    } else {
846                        new_right_indices.next().unwrap()
847                    },
848                    side: col_idx.side,
849                })
850                .collect(),
851            Arc::clone(join_filter.schema()),
852        )
853    })
854}
855
856/// Unifies `projection` with its input (which is also a [`ProjectionExec`]).
857fn try_unifying_projections(
858    projection: &ProjectionExec,
859    child: &ProjectionExec,
860) -> Result<Option<Arc<dyn ExecutionPlan>>> {
861    let mut projected_exprs = vec![];
862    let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
863
864    // Collect the column references usage in the outer projection.
865    projection.expr().iter().for_each(|proj_expr| {
866        proj_expr
867            .expr
868            .apply_with_lambdas_params(|expr, lambdas_params| {
869                Ok({
870                    if let Some(column) = expr.as_any().downcast_ref::<Column>() {
871                        if !lambdas_params.contains(column.name()) {
872                            *column_ref_map.entry(column.clone()).or_default() += 1;
873                        }
874                    }
875                    TreeNodeRecursion::Continue
876                })
877            })
878            .unwrap();
879    });
880    // Merging these projections is not beneficial, e.g
881    // If an expression is not trivial and it is referred more than 1, unifies projections will be
882    // beneficial as caching mechanism for non-trivial computations.
883    // See discussion in: https://github.com/apache/datafusion/issues/8296
884    if column_ref_map.iter().any(|(column, count)| {
885        *count > 1 && !is_expr_trivial(&Arc::clone(&child.expr()[column.index()].expr))
886    }) {
887        return Ok(None);
888    }
889    for proj_expr in projection.expr() {
890        // If there is no match in the input projection, we cannot unify these
891        // projections. This case will arise if the projection expression contains
892        // a `PhysicalExpr` variant `update_expr` doesn't support.
893        let Some(expr) = update_expr(&proj_expr.expr, child.expr(), true)? else {
894            return Ok(None);
895        };
896        projected_exprs.push(ProjectionExpr {
897            expr,
898            alias: proj_expr.alias.clone(),
899        });
900    }
901    ProjectionExec::try_new(projected_exprs, Arc::clone(child.input()))
902        .map(|e| Some(Arc::new(e) as _))
903}
904
905/// Collect all column indices from the given projection expressions.
906fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec<usize> {
907    // Collect indices and remove duplicates.
908    let mut indices = exprs
909        .iter()
910        .flat_map(|proj_expr| collect_columns(&proj_expr.expr))
911        .map(|x| x.index())
912        .collect::<std::collections::HashSet<_>>()
913        .into_iter()
914        .collect::<Vec<_>>();
915    indices.sort();
916    indices
917}
918
919/// This function determines and returns a vector of indices representing the
920/// positions of columns in `projection_exprs` that are involved in `join_filter`,
921/// and correspond to a particular side (`join_side`) of the join operation.
922///
923/// Notes: Column indices in the projection expressions are based on the join schema,
924/// whereas the join filter is based on the join child schema. `column_index_offset`
925/// represents the offset between them.
926fn new_indices_for_join_filter(
927    join_filter: &JoinFilter,
928    join_side: JoinSide,
929    projection_exprs: &[(Column, String)],
930    column_index_offset: usize,
931) -> Vec<usize> {
932    join_filter
933        .column_indices()
934        .iter()
935        .filter(|col_idx| col_idx.side == join_side)
936        .filter_map(|col_idx| {
937            projection_exprs
938                .iter()
939                .position(|(col, _)| col_idx.index + column_index_offset == col.index())
940        })
941        .collect()
942}
943
944/// This function generates a new set of columns to be used in a hash join
945/// operation based on a set of equi-join conditions (`hash_join_on`) and a
946/// list of projection expressions (`projection_exprs`).
947///
948/// Notes: Column indices in the projection expressions are based on the join schema,
949/// whereas the join on expressions are based on the join child schema. `column_index_offset`
950/// represents the offset between them.
951fn new_columns_for_join_on(
952    hash_join_on: &[&PhysicalExprRef],
953    projection_exprs: &[(Column, String)],
954    column_index_offset: usize,
955) -> Option<Vec<PhysicalExprRef>> {
956    let new_columns = hash_join_on
957        .iter()
958        .filter_map(|on| {
959            // Rewrite all columns in `on`
960            Arc::clone(*on)
961                .transform_with_lambdas_params(|expr, lambdas_params| {
962                    match expr.as_any().downcast_ref::<Column>() {
963                        Some(column) if !lambdas_params.contains(column.name()) => {
964                            let new_column = projection_exprs
965                                .iter()
966                                .enumerate()
967                                .find(|(_, (proj_column, _))| {
968                                    column.name() == proj_column.name()
969                                        && column.index() + column_index_offset
970                                            == proj_column.index()
971                                })
972                                .map(|(index, (_, alias))| Column::new(alias, index));
973                            if let Some(new_column) = new_column {
974                                Ok(Transformed::yes(Arc::new(new_column)))
975                            } else {
976                                // If the column is not found in the projection expressions,
977                                // it means that the column is not projected. In this case,
978                                // we cannot push the projection down.
979                                internal_err!(
980                                    "Column {:?} not found in projection expressions",
981                                    column
982                                )
983                            }
984                        }
985                        _ => Ok(Transformed::no(expr)),
986                    }
987                })
988                .data()
989                .ok()
990        })
991        .collect::<Vec<_>>();
992    (new_columns.len() == hash_join_on.len()).then_some(new_columns)
993}
994
995/// Checks if the given expression is trivial.
996/// An expression is considered trivial if it is either a `Column` or a `Literal`.
997fn is_expr_trivial(expr: &Arc<dyn PhysicalExpr>) -> bool {
998    expr.as_any().downcast_ref::<Column>().is_some()
999        || expr.as_any().downcast_ref::<Literal>().is_some()
1000}
1001
1002#[cfg(test)]
1003mod tests {
1004    use super::*;
1005    use std::sync::Arc;
1006
1007    use crate::common::collect;
1008    use crate::test;
1009    use crate::test::exec::StatisticsExec;
1010
1011    use arrow::datatypes::{DataType, Field, Schema};
1012    use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
1013    use datafusion_common::ScalarValue;
1014
1015    use datafusion_expr::Operator;
1016    use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, Literal};
1017
1018    #[test]
1019    fn test_collect_column_indices() -> Result<()> {
1020        let expr = Arc::new(BinaryExpr::new(
1021            Arc::new(Column::new("b", 7)),
1022            Operator::Minus,
1023            Arc::new(BinaryExpr::new(
1024                Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1025                Operator::Plus,
1026                Arc::new(Column::new("a", 1)),
1027            )),
1028        ));
1029        let column_indices = collect_column_indices(&[ProjectionExpr {
1030            expr,
1031            alias: "b-(1+a)".to_string(),
1032        }]);
1033        assert_eq!(column_indices, vec![1, 7]);
1034        Ok(())
1035    }
1036
1037    #[test]
1038    fn test_join_table_borders() -> Result<()> {
1039        let projections = vec![
1040            (Column::new("b", 1), "b".to_owned()),
1041            (Column::new("c", 2), "c".to_owned()),
1042            (Column::new("e", 4), "e".to_owned()),
1043            (Column::new("d", 3), "d".to_owned()),
1044            (Column::new("c", 2), "c".to_owned()),
1045            (Column::new("f", 5), "f".to_owned()),
1046            (Column::new("h", 7), "h".to_owned()),
1047            (Column::new("g", 6), "g".to_owned()),
1048        ];
1049        let left_table_column_count = 5;
1050        assert_eq!(
1051            join_table_borders(left_table_column_count, &projections),
1052            (4, 5)
1053        );
1054
1055        let left_table_column_count = 8;
1056        assert_eq!(
1057            join_table_borders(left_table_column_count, &projections),
1058            (7, 8)
1059        );
1060
1061        let left_table_column_count = 1;
1062        assert_eq!(
1063            join_table_borders(left_table_column_count, &projections),
1064            (-1, 0)
1065        );
1066
1067        let projections = vec![
1068            (Column::new("a", 0), "a".to_owned()),
1069            (Column::new("b", 1), "b".to_owned()),
1070            (Column::new("d", 3), "d".to_owned()),
1071            (Column::new("g", 6), "g".to_owned()),
1072            (Column::new("e", 4), "e".to_owned()),
1073            (Column::new("f", 5), "f".to_owned()),
1074            (Column::new("e", 4), "e".to_owned()),
1075            (Column::new("h", 7), "h".to_owned()),
1076        ];
1077        let left_table_column_count = 5;
1078        assert_eq!(
1079            join_table_borders(left_table_column_count, &projections),
1080            (2, 7)
1081        );
1082
1083        let left_table_column_count = 7;
1084        assert_eq!(
1085            join_table_borders(left_table_column_count, &projections),
1086            (6, 7)
1087        );
1088
1089        Ok(())
1090    }
1091
1092    #[tokio::test]
1093    async fn project_no_column() -> Result<()> {
1094        let task_ctx = Arc::new(TaskContext::default());
1095
1096        let exec = test::scan_partitioned(1);
1097        let expected = collect(exec.execute(0, Arc::clone(&task_ctx))?).await?;
1098
1099        let projection = ProjectionExec::try_new(vec![] as Vec<ProjectionExpr>, exec)?;
1100        let stream = projection.execute(0, Arc::clone(&task_ctx))?;
1101        let output = collect(stream).await?;
1102        assert_eq!(output.len(), expected.len());
1103
1104        Ok(())
1105    }
1106
1107    #[tokio::test]
1108    async fn project_old_syntax() {
1109        let exec = test::scan_partitioned(1);
1110        let schema = exec.schema();
1111        let expr = col("i", &schema).unwrap();
1112        ProjectionExec::try_new(
1113            vec![
1114                // use From impl of ProjectionExpr to create ProjectionExpr
1115                // to test old syntax
1116                (expr, "c".to_string()),
1117            ],
1118            exec,
1119        )
1120        // expect this to succeed
1121        .unwrap();
1122    }
1123
1124    #[test]
1125    fn test_projection_statistics_uses_input_schema() {
1126        let input_schema = Schema::new(vec![
1127            Field::new("a", DataType::Int32, false),
1128            Field::new("b", DataType::Int32, false),
1129            Field::new("c", DataType::Int32, false),
1130            Field::new("d", DataType::Int32, false),
1131            Field::new("e", DataType::Int32, false),
1132            Field::new("f", DataType::Int32, false),
1133        ]);
1134
1135        let input_statistics = Statistics {
1136            num_rows: Precision::Exact(10),
1137            column_statistics: vec![
1138                ColumnStatistics {
1139                    min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1140                    max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1141                    ..Default::default()
1142                },
1143                ColumnStatistics {
1144                    min_value: Precision::Exact(ScalarValue::Int32(Some(5))),
1145                    max_value: Precision::Exact(ScalarValue::Int32(Some(50))),
1146                    ..Default::default()
1147                },
1148                ColumnStatistics {
1149                    min_value: Precision::Exact(ScalarValue::Int32(Some(10))),
1150                    max_value: Precision::Exact(ScalarValue::Int32(Some(40))),
1151                    ..Default::default()
1152                },
1153                ColumnStatistics {
1154                    min_value: Precision::Exact(ScalarValue::Int32(Some(20))),
1155                    max_value: Precision::Exact(ScalarValue::Int32(Some(30))),
1156                    ..Default::default()
1157                },
1158                ColumnStatistics {
1159                    min_value: Precision::Exact(ScalarValue::Int32(Some(21))),
1160                    max_value: Precision::Exact(ScalarValue::Int32(Some(29))),
1161                    ..Default::default()
1162                },
1163                ColumnStatistics {
1164                    min_value: Precision::Exact(ScalarValue::Int32(Some(24))),
1165                    max_value: Precision::Exact(ScalarValue::Int32(Some(26))),
1166                    ..Default::default()
1167                },
1168            ],
1169            ..Default::default()
1170        };
1171
1172        let input = Arc::new(StatisticsExec::new(input_statistics, input_schema));
1173
1174        // Create projection expressions that reference columns from the input schema and the length
1175        // of output schema columns < input schema columns and hence if we use the last few columns
1176        // from the input schema in the expressions here, bounds_check would fail on them if output
1177        // schema is supplied to the partitions_statistics method.
1178        let exprs: Vec<ProjectionExpr> = vec![
1179            ProjectionExpr {
1180                expr: Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>,
1181                alias: "c_renamed".to_string(),
1182            },
1183            ProjectionExpr {
1184                expr: Arc::new(BinaryExpr::new(
1185                    Arc::new(Column::new("e", 4)),
1186                    Operator::Plus,
1187                    Arc::new(Column::new("f", 5)),
1188                )) as Arc<dyn PhysicalExpr>,
1189                alias: "e_plus_f".to_string(),
1190            },
1191        ];
1192
1193        let projection = ProjectionExec::try_new(exprs, input).unwrap();
1194
1195        let stats = projection.partition_statistics(None).unwrap();
1196
1197        assert_eq!(stats.num_rows, Precision::Exact(10));
1198        assert_eq!(
1199            stats.column_statistics.len(),
1200            2,
1201            "Expected 2 columns in projection statistics"
1202        );
1203        assert!(stats.total_byte_size.is_exact().unwrap_or(false));
1204    }
1205}