datafusion_physical_expr/
projection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Deref;
19use std::sync::Arc;
20
21use crate::expressions::Column;
22use crate::utils::collect_columns;
23use crate::{PhysicalExpr, PhysicalExprExt};
24
25use arrow::datatypes::{Field, Schema, SchemaRef};
26use datafusion_common::stats::{ColumnStatistics, Precision};
27use datafusion_common::tree_node::{Transformed, TransformedResult};
28use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
29
30use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
31use indexmap::IndexMap;
32use itertools::Itertools;
33
34/// A projection expression as used by projection operations.
35///
36/// The expression is evaluated and the result is stored in a column
37/// with the name specified by `alias`.
38///
39/// For example, the SQL expression `a + b AS sum_ab` would be represented
40/// as a `ProjectionExpr` where `expr` is the expression `a + b`
41/// and `alias` is the string `sum_ab`.
42#[derive(Debug, Clone)]
43pub struct ProjectionExpr {
44    /// The expression that will be evaluated.
45    pub expr: Arc<dyn PhysicalExpr>,
46    /// The name of the output column for use an output schema.
47    pub alias: String,
48}
49
50impl std::fmt::Display for ProjectionExpr {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        if self.expr.to_string() == self.alias {
53            write!(f, "{}", self.alias)
54        } else {
55            write!(f, "{} AS {}", self.expr, self.alias)
56        }
57    }
58}
59
60impl ProjectionExpr {
61    /// Create a new projection expression
62    pub fn new(expr: Arc<dyn PhysicalExpr>, alias: String) -> Self {
63        Self { expr, alias }
64    }
65
66    /// Create a new projection expression from an expression and a schema using the expression's output field name as alias.
67    pub fn new_from_expression(
68        expr: Arc<dyn PhysicalExpr>,
69        schema: &Schema,
70    ) -> Result<Self> {
71        let field = expr.return_field(schema)?;
72        Ok(Self {
73            expr,
74            alias: field.name().to_string(),
75        })
76    }
77}
78
79impl From<(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
80    fn from(value: (Arc<dyn PhysicalExpr>, String)) -> Self {
81        Self::new(value.0, value.1)
82    }
83}
84
85impl From<&(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
86    fn from(value: &(Arc<dyn PhysicalExpr>, String)) -> Self {
87        Self::new(Arc::clone(&value.0), value.1.clone())
88    }
89}
90
91impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
92    fn from(value: ProjectionExpr) -> Self {
93        (value.expr, value.alias)
94    }
95}
96
97/// A collection of projection expressions.
98///
99/// This struct encapsulates multiple `ProjectionExpr` instances,
100/// representing a complete projection operation and provides
101/// methods to manipulate and analyze the projection as a whole.
102#[derive(Debug, Clone)]
103pub struct ProjectionExprs {
104    exprs: Vec<ProjectionExpr>,
105}
106
107impl std::fmt::Display for ProjectionExprs {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        let exprs: Vec<String> = self.exprs.iter().map(|e| e.to_string()).collect();
110        write!(f, "Projection[{}]", exprs.join(", "))
111    }
112}
113
114impl From<Vec<ProjectionExpr>> for ProjectionExprs {
115    fn from(value: Vec<ProjectionExpr>) -> Self {
116        Self { exprs: value }
117    }
118}
119
120impl From<&[ProjectionExpr]> for ProjectionExprs {
121    fn from(value: &[ProjectionExpr]) -> Self {
122        Self {
123            exprs: value.to_vec(),
124        }
125    }
126}
127
128impl FromIterator<ProjectionExpr> for ProjectionExprs {
129    fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self {
130        Self {
131            exprs: exprs.into_iter().collect::<Vec<_>>(),
132        }
133    }
134}
135
136impl AsRef<[ProjectionExpr]> for ProjectionExprs {
137    fn as_ref(&self) -> &[ProjectionExpr] {
138        &self.exprs
139    }
140}
141
142impl ProjectionExprs {
143    pub fn new<I>(exprs: I) -> Self
144    where
145        I: IntoIterator<Item = ProjectionExpr>,
146    {
147        Self {
148            exprs: exprs.into_iter().collect::<Vec<_>>(),
149        }
150    }
151
152    /// Creates a [`ProjectionExpr`] from a list of column indices.
153    ///
154    /// This is a convenience method for creating simple column-only projections, where each projection expression is a reference to a column
155    /// in the input schema.
156    ///
157    /// # Behavior
158    /// - Ordering: the output projection preserves the exact order of indices provided in the input slice
159    ///   For example, `[2, 0, 1]` will produce projections for columns 2, 0, then 1 in that order
160    /// - Duplicates: Duplicate indices are allowed and will create multiple projection expressions referencing the same source column
161    ///   For example, `[0, 0]` creates 2 separate projections both referencing column 0
162    ///
163    /// # Panics
164    /// Panics if any index in `indices` is out of bounds for the provided schema.
165    ///
166    /// # Example
167    ///
168    /// ```rust
169    /// use arrow::datatypes::{DataType, Field, Schema};
170    /// use datafusion_physical_expr::projection::ProjectionExprs;
171    /// use std::sync::Arc;
172    ///
173    /// // Create a schema with three columns
174    /// let schema = Arc::new(Schema::new(vec![
175    ///     Field::new("a", DataType::Int32, false),
176    ///     Field::new("b", DataType::Utf8, false),
177    ///     Field::new("c", DataType::Float64, false),
178    /// ]));
179    ///
180    /// // Project columns at indices 2 and 0 (c and a) - ordering is preserved
181    /// let projection = ProjectionExprs::from_indices(&[2, 0], &schema);
182    ///
183    /// // This creates: SELECT c@2 AS c, a@0 AS a
184    /// assert_eq!(projection.as_ref().len(), 2);
185    /// assert_eq!(projection.as_ref()[0].alias, "c");
186    /// assert_eq!(projection.as_ref()[1].alias, "a");
187    ///
188    /// // Duplicate indices are allowed
189    /// let projection_with_dups = ProjectionExprs::from_indices(&[0, 0, 1], &schema);
190    /// assert_eq!(projection_with_dups.as_ref().len(), 3);
191    /// assert_eq!(projection_with_dups.as_ref()[0].alias, "a");
192    /// assert_eq!(projection_with_dups.as_ref()[1].alias, "a"); // duplicate
193    /// assert_eq!(projection_with_dups.as_ref()[2].alias, "b");
194    /// ```
195    pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Self {
196        let projection_exprs = indices.iter().map(|&i| {
197            let field = schema.field(i);
198            ProjectionExpr {
199                expr: Arc::new(Column::new(field.name(), i)),
200                alias: field.name().clone(),
201            }
202        });
203
204        Self::from_iter(projection_exprs)
205    }
206
207    /// Returns an iterator over the projection expressions
208    pub fn iter(&self) -> impl Iterator<Item = &ProjectionExpr> {
209        self.exprs.iter()
210    }
211
212    /// Creates a ProjectionMapping from this projection
213    pub fn projection_mapping(
214        &self,
215        input_schema: &SchemaRef,
216    ) -> Result<ProjectionMapping> {
217        ProjectionMapping::try_new(
218            self.exprs
219                .iter()
220                .map(|p| (Arc::clone(&p.expr), p.alias.clone())),
221            input_schema,
222        )
223    }
224
225    /// Iterate over a clone of the projection expressions.
226    pub fn expr_iter(&self) -> impl Iterator<Item = Arc<dyn PhysicalExpr>> + '_ {
227        self.exprs.iter().map(|e| Arc::clone(&e.expr))
228    }
229
230    /// Apply another projection on top of this projection, returning the combined projection.
231    /// For example, if this projection is `SELECT c@2 AS x, b@1 AS y, a@0 as z` and the other projection is `SELECT x@0 + 1 AS c1, y@1 + z@2 as c2`,
232    /// we return a projection equivalent to `SELECT c@2 + 1 AS c1, b@1 + a@0 as c2`.
233    ///
234    /// # Example
235    ///
236    /// ```rust
237    /// use datafusion_common::{Result, ScalarValue};
238    /// use datafusion_expr::Operator;
239    /// use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
240    /// use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
241    /// use std::sync::Arc;
242    ///
243    /// fn main() -> Result<()> {
244    ///     // Example from the docstring:
245    ///     // Base projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
246    ///     let base = ProjectionExprs::new(vec![
247    ///         ProjectionExpr {
248    ///             expr: Arc::new(Column::new("c", 2)),
249    ///             alias: "x".to_string(),
250    ///         },
251    ///         ProjectionExpr {
252    ///             expr: Arc::new(Column::new("b", 1)),
253    ///             alias: "y".to_string(),
254    ///         },
255    ///         ProjectionExpr {
256    ///             expr: Arc::new(Column::new("a", 0)),
257    ///             alias: "z".to_string(),
258    ///         },
259    ///     ]);
260    ///
261    ///     // Top projection: SELECT x@0 + 1 AS c1, y@1 + z@2 AS c2
262    ///     let top = ProjectionExprs::new(vec![
263    ///         ProjectionExpr {
264    ///             expr: Arc::new(BinaryExpr::new(
265    ///                 Arc::new(Column::new("x", 0)),
266    ///                 Operator::Plus,
267    ///                 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
268    ///             )),
269    ///             alias: "c1".to_string(),
270    ///         },
271    ///         ProjectionExpr {
272    ///             expr: Arc::new(BinaryExpr::new(
273    ///                 Arc::new(Column::new("y", 1)),
274    ///                 Operator::Plus,
275    ///                 Arc::new(Column::new("z", 2)),
276    ///             )),
277    ///             alias: "c2".to_string(),
278    ///         },
279    ///     ]);
280    ///
281    ///     // Expected result: SELECT c@2 + 1 AS c1, b@1 + a@0 AS c2
282    ///     let result = base.try_merge(&top)?;
283    ///
284    ///     assert_eq!(result.as_ref().len(), 2);
285    ///     assert_eq!(result.as_ref()[0].alias, "c1");
286    ///     assert_eq!(result.as_ref()[1].alias, "c2");
287    ///
288    ///     Ok(())
289    /// }
290    /// ```
291    ///
292    /// # Errors
293    /// This function returns an error if any expression in the `other` projection cannot be
294    /// applied on top of this projection.
295    pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> {
296        let mut new_exprs = Vec::with_capacity(other.exprs.len());
297        for proj_expr in &other.exprs {
298            let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)?
299                .ok_or_else(|| {
300                    internal_datafusion_err!(
301                        "Failed to combine projections: expression {} could not be applied on top of existing projections {}",
302                        proj_expr.expr,
303                        self.exprs.iter().map(|e| format!("{e}")).join(", ")
304                    )
305                })?;
306            new_exprs.push(ProjectionExpr {
307                expr: new_expr,
308                alias: proj_expr.alias.clone(),
309            });
310        }
311        Ok(ProjectionExprs::new(new_exprs))
312    }
313
314    /// Extract the column indices used in this projection.
315    /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
316    /// this function would return `[0, 1]`.
317    /// Repeated indices are returned only once, and the order is ascending.
318    pub fn column_indices(&self) -> Vec<usize> {
319        self.exprs
320            .iter()
321            .flat_map(|e| collect_columns(&e.expr).into_iter().map(|col| col.index()))
322            .sorted_unstable()
323            .dedup()
324            .collect_vec()
325    }
326
327    /// Extract the ordered column indices for a column-only projection.
328    ///
329    /// This function assumes that all expressions in the projection are simple column references.
330    /// It returns the column indices in the order they appear in the projection.
331    ///
332    /// # Panics
333    ///
334    /// Panics if any expression in the projection is not a simple column reference. This includes:
335    /// - Computed expressions (e.g., `a + 1`, `CAST(a AS INT)`)
336    /// - Function calls (e.g., `UPPER(name)`, `SUM(amount)`)
337    /// - Literals (e.g., `42`, `'hello'`)
338    /// - Complex nested expressions (e.g., `CASE WHEN ... THEN ... END`)
339    ///
340    /// # Returns
341    ///
342    /// A vector of column indices in projection order. Unlike [`column_indices()`](Self::column_indices),
343    /// this function:
344    /// - Preserves the projection order (does not sort)
345    /// - Preserves duplicates (does not deduplicate)
346    ///
347    /// # Example
348    ///
349    /// For a projection `SELECT c, a, c` where `a` is at index 0 and `c` is at index 2,
350    /// this function would return `[2, 0, 2]`.
351    ///
352    /// Use [`column_indices()`](Self::column_indices) instead if the projection may contain
353    /// non-column expressions or if you need a deduplicated sorted list.
354    pub fn ordered_column_indices(&self) -> Vec<usize> {
355        self.exprs
356            .iter()
357            .map(|e| {
358                e.expr
359                    .as_any()
360                    .downcast_ref::<Column>()
361                    .expect("Expected column reference in projection")
362                    .index()
363            })
364            .collect()
365    }
366
367    /// Project a schema according to this projection.
368    /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
369    /// if the input schema is `[a: Int32, b: Int32, c: Int32]`, the output schema would be `[x: Int32, y: Int32]`.
370    /// Fields' metadata are preserved from the input schema.
371    pub fn project_schema(&self, input_schema: &Schema) -> Result<Schema> {
372        let fields: Result<Vec<Field>> = self
373            .exprs
374            .iter()
375            .map(|proj_expr| {
376                let metadata = proj_expr
377                    .expr
378                    .return_field(input_schema)?
379                    .metadata()
380                    .clone();
381
382                let field = Field::new(
383                    &proj_expr.alias,
384                    proj_expr.expr.data_type(input_schema)?,
385                    proj_expr.expr.nullable(input_schema)?,
386                )
387                .with_metadata(metadata);
388
389                Ok(field)
390            })
391            .collect();
392
393        Ok(Schema::new_with_metadata(
394            fields?,
395            input_schema.metadata().clone(),
396        ))
397    }
398
399    /// Project statistics according to this projection.
400    /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
401    /// if the input statistics has column statistics for columns `a`, `b`, and `c`, the output statistics would have column statistics for columns `x` and `y`.
402    pub fn project_statistics(
403        &self,
404        mut stats: datafusion_common::Statistics,
405        input_schema: &Schema,
406    ) -> Result<datafusion_common::Statistics> {
407        let mut primitive_row_size = 0;
408        let mut primitive_row_size_possible = true;
409        let mut column_statistics = vec![];
410
411        for proj_expr in &self.exprs {
412            let expr = &proj_expr.expr;
413            let col_stats = if let Some(col) = expr.as_any().downcast_ref::<Column>() {
414                stats.column_statistics[col.index()].clone()
415            } else {
416                // TODO stats: estimate more statistics from expressions
417                // (expressions should compute their statistics themselves)
418                ColumnStatistics::new_unknown()
419            };
420            column_statistics.push(col_stats);
421            let data_type = expr.data_type(input_schema)?;
422            if let Some(value) = data_type.primitive_width() {
423                primitive_row_size += value;
424                continue;
425            }
426            primitive_row_size_possible = false;
427        }
428
429        if primitive_row_size_possible {
430            stats.total_byte_size =
431                Precision::Exact(primitive_row_size).multiply(&stats.num_rows);
432        }
433        stats.column_statistics = column_statistics;
434        Ok(stats)
435    }
436}
437
438impl<'a> IntoIterator for &'a ProjectionExprs {
439    type Item = &'a ProjectionExpr;
440    type IntoIter = std::slice::Iter<'a, ProjectionExpr>;
441
442    fn into_iter(self) -> Self::IntoIter {
443        self.exprs.iter()
444    }
445}
446
447impl IntoIterator for ProjectionExprs {
448    type Item = ProjectionExpr;
449    type IntoIter = std::vec::IntoIter<ProjectionExpr>;
450
451    fn into_iter(self) -> Self::IntoIter {
452        self.exprs.into_iter()
453    }
454}
455
456/// The function operates in two modes:
457///
458/// 1) When `sync_with_child` is `true`:
459///
460///    The function updates the indices of `expr` if the expression resides
461///    in the input plan. For instance, given the expressions `a@1 + b@2`
462///    and `c@0` with the input schema `c@2, a@0, b@1`, the expressions are
463///    updated to `a@0 + b@1` and `c@2`.
464///
465/// 2) When `sync_with_child` is `false`:
466///
467///    The function determines how the expression would be updated if a projection
468///    was placed before the plan associated with the expression. If the expression
469///    cannot be rewritten after the projection, it returns `None`. For example,
470///    given the expressions `c@0`, `a@1` and `b@2`, and the projection with
471///    an output schema of `a, c_new`, then `c@0` becomes `c_new@1`, `a@1` becomes
472///    `a@0`, but `b@2` results in `None` since the projection does not include `b`.
473///
474/// # Errors
475/// This function returns an error if `sync_with_child` is `true` and if any expression references
476/// an index that is out of bounds for `projected_exprs`.
477/// For example:
478///
479/// - `expr` is `a@3`
480/// - `projected_exprs` is \[`a@0`, `b@1`\]
481///
482/// In this case, `a@3` references index 3, which is out of bounds for `projected_exprs` (which has length 2).
483pub fn update_expr(
484    expr: &Arc<dyn PhysicalExpr>,
485    projected_exprs: &[ProjectionExpr],
486    sync_with_child: bool,
487) -> Result<Option<Arc<dyn PhysicalExpr>>> {
488    #[derive(Debug, PartialEq)]
489    enum RewriteState {
490        /// The expression is unchanged.
491        Unchanged,
492        /// Some part of the expression has been rewritten
493        RewrittenValid,
494        /// Some part of the expression has been rewritten, but some column
495        /// references could not be.
496        RewrittenInvalid,
497    }
498
499    let mut state = RewriteState::Unchanged;
500
501    let new_expr = Arc::clone(expr)
502        .transform_up_with_lambdas_params(|expr, lambdas_params| {
503            if state == RewriteState::RewrittenInvalid {
504                return Ok(Transformed::no(expr));
505            }
506
507            let column = match expr.as_any().downcast_ref::<Column>() {
508                Some(column) if !lambdas_params.contains(column.name()) => column,
509                _ => {
510                            return Ok(Transformed::no(expr));
511                        }
512            };
513            if sync_with_child {
514                state = RewriteState::RewrittenValid;
515                // Update the index of `column`:
516                let projected_expr = projected_exprs.get(column.index()).ok_or_else(|| {
517                    internal_datafusion_err!(
518                        "Column index {} out of bounds for projected expressions of length {}",
519                        column.index(),
520                        projected_exprs.len()
521                    )
522                })?;
523                Ok(Transformed::yes(Arc::clone(&projected_expr.expr)))
524            } else {
525                // default to invalid, in case we can't find the relevant column
526                state = RewriteState::RewrittenInvalid;
527                // Determine how to update `column` to accommodate `projected_exprs`
528                projected_exprs
529                    .iter()
530                    .enumerate()
531                    .find_map(|(index, proj_expr)| {
532                        proj_expr.expr.as_any().downcast_ref::<Column>().and_then(
533                            |projected_column| {
534                                (column.name().eq(projected_column.name())
535                                    && column.index() == projected_column.index())
536                                .then(|| {
537                                    state = RewriteState::RewrittenValid;
538                                    Arc::new(Column::new(&proj_expr.alias, index)) as _
539                                })
540                            },
541                        )
542                    })
543                    .map_or_else(
544                        || Ok(Transformed::no(expr)),
545                        |c| Ok(Transformed::yes(c)),
546                    )
547            }
548        })
549        .data()?;
550
551    Ok((state == RewriteState::RewrittenValid).then_some(new_expr))
552}
553
554/// Stores target expressions, along with their indices, that associate with a
555/// source expression in a projection mapping.
556#[derive(Clone, Debug, Default)]
557pub struct ProjectionTargets {
558    /// A non-empty vector of pairs of target expressions and their indices.
559    /// Consider using a special non-empty collection type in the future (e.g.
560    /// if Rust provides one in the standard library).
561    exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>,
562}
563
564impl ProjectionTargets {
565    /// Returns the first target expression and its index.
566    pub fn first(&self) -> &(Arc<dyn PhysicalExpr>, usize) {
567        // Since the vector is non-empty, we can safely unwrap:
568        self.exprs_indices.first().unwrap()
569    }
570
571    /// Adds a target expression and its index to the list of targets.
572    pub fn push(&mut self, target: (Arc<dyn PhysicalExpr>, usize)) {
573        self.exprs_indices.push(target);
574    }
575}
576
577impl Deref for ProjectionTargets {
578    type Target = [(Arc<dyn PhysicalExpr>, usize)];
579
580    fn deref(&self) -> &Self::Target {
581        &self.exprs_indices
582    }
583}
584
585impl From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets {
586    fn from(exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>) -> Self {
587        Self { exprs_indices }
588    }
589}
590
591/// Stores the mapping between source expressions and target expressions for a
592/// projection.
593#[derive(Clone, Debug)]
594pub struct ProjectionMapping {
595    /// Mapping between source expressions and target expressions.
596    /// Vector indices correspond to the indices after projection.
597    map: IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>,
598}
599
600impl ProjectionMapping {
601    /// Constructs the mapping between a projection's input and output
602    /// expressions.
603    ///
604    /// For example, given the input projection expressions (`a + b`, `c + d`)
605    /// and an output schema with two columns `"c + d"` and `"a + b"`, the
606    /// projection mapping would be:
607    ///
608    /// ```text
609    ///  [0]: (c + d, [(col("c + d"), 0)])
610    ///  [1]: (a + b, [(col("a + b"), 1)])
611    /// ```
612    ///
613    /// where `col("c + d")` means the column named `"c + d"`.
614    pub fn try_new(
615        expr: impl IntoIterator<Item = (Arc<dyn PhysicalExpr>, String)>,
616        input_schema: &SchemaRef,
617    ) -> Result<Self> {
618        // Construct a map from the input expressions to the output expression of the projection:
619        let mut map = IndexMap::<_, ProjectionTargets>::new();
620        for (expr_idx, (expr, name)) in expr.into_iter().enumerate() {
621            let target_expr = Arc::new(Column::new(&name, expr_idx)) as _;
622            let source_expr = expr.transform_down_with_schema(input_schema, |e, schema| match e.as_any().downcast_ref::<Column>() {
623                Some(col) => {
624                    // Sometimes, an expression and its name in the schema
625                    // doesn't match. This can cause problems, so we make sure
626                    // that the expression name matches with the name in `schema`.
627                    // Conceptually, `source_expr` and `expression` should be the same.
628                    let idx = col.index();
629                    let matching_field = schema.field(idx);
630                    let matching_name = matching_field.name();
631                    if col.name() != matching_name {
632                        return internal_err!(
633                            "Input field name {} does not match with the projection expression {}",
634                            matching_name,
635                            col.name()
636                        );
637                    }
638                    let matching_column = Column::new(matching_name, idx);
639                    Ok(Transformed::yes(Arc::new(matching_column)))
640                }
641                None => Ok(Transformed::no(e)),
642            })
643            .data()?;
644            map.entry(source_expr)
645                .or_default()
646                .push((target_expr, expr_idx));
647        }
648        Ok(Self { map })
649    }
650
651    /// Constructs a subset mapping using the provided indices.
652    ///
653    /// This is used when the output is a subset of the input without any
654    /// other transformations. The indices are for columns in the schema.
655    pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Result<Self> {
656        let projection_exprs = indices.iter().map(|index| {
657            let field = schema.field(*index);
658            let column = Arc::new(Column::new(field.name(), *index));
659            (column as _, field.name().clone())
660        });
661        ProjectionMapping::try_new(projection_exprs, schema)
662    }
663}
664
665impl Deref for ProjectionMapping {
666    type Target = IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>;
667
668    fn deref(&self) -> &Self::Target {
669        &self.map
670    }
671}
672
673impl FromIterator<(Arc<dyn PhysicalExpr>, ProjectionTargets)> for ProjectionMapping {
674    fn from_iter<T: IntoIterator<Item = (Arc<dyn PhysicalExpr>, ProjectionTargets)>>(
675        iter: T,
676    ) -> Self {
677        Self {
678            map: IndexMap::from_iter(iter),
679        }
680    }
681}
682
683/// Projects a slice of [LexOrdering]s onto the given schema.
684///
685/// This is a convenience wrapper that applies [project_ordering] to each
686/// input ordering and collects the successful projections:
687/// - For each input ordering, the result of [project_ordering] is appended to
688///   the output if it is `Some(...)`.
689/// - Order is preserved and no deduplication is attempted.
690/// - If none of the input orderings can be projected, an empty `Vec` is
691///   returned.
692///
693/// See [project_ordering] for the semantics of projecting a single
694/// [LexOrdering].
695pub fn project_orderings(
696    orderings: &[LexOrdering],
697    schema: &SchemaRef,
698) -> Vec<LexOrdering> {
699    let mut projected_orderings = vec![];
700
701    for ordering in orderings {
702        projected_orderings.extend(project_ordering(ordering, schema));
703    }
704
705    projected_orderings
706}
707
708/// Projects a single [LexOrdering] onto the given schema.
709///
710/// This function attempts to rewrite every [PhysicalSortExpr] in the provided
711/// [LexOrdering] so that any [Column] expressions point at the correct field
712/// indices in `schema`.
713///
714/// Key details:
715/// - Columns are matched by name, not by index. The index of each matched
716///   column is looked up with [Schema::column_with_name](arrow::datatypes::Schema::column_with_name) and a new
717///   [Column] with the correct [index](Column::index) is substituted.
718/// - If an expression references a column name that does not exist in
719///   `schema`, projection of the current ordering stops and only the already
720///   rewritten prefix is kept. This models the fact that a lexicographical
721///   ordering remains valid for any leading prefix whose expressions are
722///   present in the projected schema.
723/// - If no expressions can be projected (i.e. the first one is missing), the
724///   function returns `None`.
725///
726/// Return value:
727/// - `Some(LexOrdering)` if at least one sort expression could be projected.
728///   The returned ordering may be a strict prefix of the input ordering.
729/// - `None` if no part of the ordering can be projected onto `schema`.
730///
731/// Example
732///
733/// Suppose we have an input ordering `[col("a@0"), col("b@1")]` but the projected
734/// schema only contains b and not a. The result will be `Some([col("a@0")])`. In other
735/// words, the column reference is reindexed to match the projected schema.
736/// If neither a nor b is present, the result will be None.
737pub fn project_ordering(
738    ordering: &LexOrdering,
739    schema: &SchemaRef,
740) -> Option<LexOrdering> {
741    let mut projected_exprs = vec![];
742    for PhysicalSortExpr { expr, options } in ordering.iter() {
743        let transformed =
744            Arc::clone(expr).transform_up_with_lambdas_params(|expr, lambdas_params| {
745                let col = match expr.as_any().downcast_ref::<Column>() {
746                    Some(col) if !lambdas_params.contains(col.name()) => col,
747                    _ => {
748                        return Ok(Transformed::no(expr));
749                    }
750                };
751
752                let name = col.name();
753                if let Some((idx, _)) = schema.column_with_name(name) {
754                    // Compute the new column expression (with correct index) after projection:
755                    Ok(Transformed::yes(Arc::new(Column::new(name, idx))))
756                } else {
757                    // Cannot find expression in the projected_schema,
758                    // signal this using an Err result
759                    plan_err!("")
760                }
761            });
762
763        match transformed {
764            Ok(transformed) => {
765                projected_exprs.push(PhysicalSortExpr::new(transformed.data, *options));
766            }
767            Err(_) => {
768                // Err result indicates an expression could not be found in the
769                // projected_schema, stop iterating since rest of the orderings are violated
770                break;
771            }
772        }
773    }
774
775    LexOrdering::new(projected_exprs)
776}
777
778#[cfg(test)]
779pub(crate) mod tests {
780    use std::collections::HashMap;
781
782    use super::*;
783    use crate::equivalence::{convert_to_orderings, EquivalenceProperties};
784    use crate::expressions::{col, BinaryExpr, Literal};
785    use crate::utils::tests::TestScalarUDF;
786    use crate::{PhysicalExprRef, ScalarFunctionExpr};
787
788    use arrow::compute::SortOptions;
789    use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
790    use datafusion_common::config::ConfigOptions;
791    use datafusion_common::{ScalarValue, Statistics};
792    use datafusion_expr::{Operator, ScalarUDF};
793    use insta::assert_snapshot;
794
795    pub(crate) fn output_schema(
796        mapping: &ProjectionMapping,
797        input_schema: &Arc<Schema>,
798    ) -> Result<SchemaRef> {
799        // Calculate output schema:
800        let mut fields = vec![];
801        for (source, targets) in mapping.iter() {
802            let data_type = source.data_type(input_schema)?;
803            let nullable = source.nullable(input_schema)?;
804            for (target, _) in targets.iter() {
805                let Some(column) = target.as_any().downcast_ref::<Column>() else {
806                    return plan_err!("Expects to have column");
807                };
808                fields.push(Field::new(column.name(), data_type.clone(), nullable));
809            }
810        }
811
812        let output_schema = Arc::new(Schema::new_with_metadata(
813            fields,
814            input_schema.metadata().clone(),
815        ));
816
817        Ok(output_schema)
818    }
819
820    #[test]
821    fn project_orderings() -> Result<()> {
822        let schema = Arc::new(Schema::new(vec![
823            Field::new("a", DataType::Int32, true),
824            Field::new("b", DataType::Int32, true),
825            Field::new("c", DataType::Int32, true),
826            Field::new("d", DataType::Int32, true),
827            Field::new("e", DataType::Int32, true),
828            Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
829        ]));
830        let col_a = &col("a", &schema)?;
831        let col_b = &col("b", &schema)?;
832        let col_c = &col("c", &schema)?;
833        let col_d = &col("d", &schema)?;
834        let col_e = &col("e", &schema)?;
835        let col_ts = &col("ts", &schema)?;
836        let a_plus_b = Arc::new(BinaryExpr::new(
837            Arc::clone(col_a),
838            Operator::Plus,
839            Arc::clone(col_b),
840        )) as Arc<dyn PhysicalExpr>;
841        let b_plus_d = Arc::new(BinaryExpr::new(
842            Arc::clone(col_b),
843            Operator::Plus,
844            Arc::clone(col_d),
845        )) as Arc<dyn PhysicalExpr>;
846        let b_plus_e = Arc::new(BinaryExpr::new(
847            Arc::clone(col_b),
848            Operator::Plus,
849            Arc::clone(col_e),
850        )) as Arc<dyn PhysicalExpr>;
851        let c_plus_d = Arc::new(BinaryExpr::new(
852            Arc::clone(col_c),
853            Operator::Plus,
854            Arc::clone(col_d),
855        )) as Arc<dyn PhysicalExpr>;
856
857        let option_asc = SortOptions {
858            descending: false,
859            nulls_first: false,
860        };
861        let option_desc = SortOptions {
862            descending: true,
863            nulls_first: true,
864        };
865
866        let test_cases = vec![
867            // ---------- TEST CASE 1 ------------
868            (
869                // orderings
870                vec![
871                    // [b ASC]
872                    vec![(col_b, option_asc)],
873                ],
874                // projection exprs
875                vec![(col_b, "b_new".to_string()), (col_a, "a_new".to_string())],
876                // expected
877                vec![
878                    // [b_new ASC]
879                    vec![("b_new", option_asc)],
880                ],
881            ),
882            // ---------- TEST CASE 2 ------------
883            (
884                // orderings
885                vec![
886                    // empty ordering
887                ],
888                // projection exprs
889                vec![(col_c, "c_new".to_string()), (col_b, "b_new".to_string())],
890                // expected
891                vec![
892                    // no ordering at the output
893                ],
894            ),
895            // ---------- TEST CASE 3 ------------
896            (
897                // orderings
898                vec![
899                    // [ts ASC]
900                    vec![(col_ts, option_asc)],
901                ],
902                // projection exprs
903                vec![
904                    (col_b, "b_new".to_string()),
905                    (col_a, "a_new".to_string()),
906                    (col_ts, "ts_new".to_string()),
907                ],
908                // expected
909                vec![
910                    // [ts_new ASC]
911                    vec![("ts_new", option_asc)],
912                ],
913            ),
914            // ---------- TEST CASE 4 ------------
915            (
916                // orderings
917                vec![
918                    // [a ASC, ts ASC]
919                    vec![(col_a, option_asc), (col_ts, option_asc)],
920                    // [b ASC, ts ASC]
921                    vec![(col_b, option_asc), (col_ts, option_asc)],
922                ],
923                // projection exprs
924                vec![
925                    (col_b, "b_new".to_string()),
926                    (col_a, "a_new".to_string()),
927                    (col_ts, "ts_new".to_string()),
928                ],
929                // expected
930                vec![
931                    // [a_new ASC, ts_new ASC]
932                    vec![("a_new", option_asc), ("ts_new", option_asc)],
933                    // [b_new ASC, ts_new ASC]
934                    vec![("b_new", option_asc), ("ts_new", option_asc)],
935                ],
936            ),
937            // ---------- TEST CASE 5 ------------
938            (
939                // orderings
940                vec![
941                    // [a + b ASC]
942                    vec![(&a_plus_b, option_asc)],
943                ],
944                // projection exprs
945                vec![
946                    (col_b, "b_new".to_string()),
947                    (col_a, "a_new".to_string()),
948                    (&a_plus_b, "a+b".to_string()),
949                ],
950                // expected
951                vec![
952                    // [a + b ASC]
953                    vec![("a+b", option_asc)],
954                ],
955            ),
956            // ---------- TEST CASE 6 ------------
957            (
958                // orderings
959                vec![
960                    // [a + b ASC, c ASC]
961                    vec![(&a_plus_b, option_asc), (col_c, option_asc)],
962                ],
963                // projection exprs
964                vec![
965                    (col_b, "b_new".to_string()),
966                    (col_a, "a_new".to_string()),
967                    (col_c, "c_new".to_string()),
968                    (&a_plus_b, "a+b".to_string()),
969                ],
970                // expected
971                vec![
972                    // [a + b ASC, c_new ASC]
973                    vec![("a+b", option_asc), ("c_new", option_asc)],
974                ],
975            ),
976            // ------- TEST CASE 7 ----------
977            (
978                vec![
979                    // [a ASC, b ASC, c ASC]
980                    vec![(col_a, option_asc), (col_b, option_asc)],
981                    // [a ASC, d ASC]
982                    vec![(col_a, option_asc), (col_d, option_asc)],
983                ],
984                // b as b_new, a as a_new, d as d_new b+d
985                vec![
986                    (col_b, "b_new".to_string()),
987                    (col_a, "a_new".to_string()),
988                    (col_d, "d_new".to_string()),
989                    (&b_plus_d, "b+d".to_string()),
990                ],
991                // expected
992                vec![
993                    // [a_new ASC, b_new ASC]
994                    vec![("a_new", option_asc), ("b_new", option_asc)],
995                    // [a_new ASC, d_new ASC]
996                    vec![("a_new", option_asc), ("d_new", option_asc)],
997                    // [a_new ASC, b+d ASC]
998                    vec![("a_new", option_asc), ("b+d", option_asc)],
999                ],
1000            ),
1001            // ------- TEST CASE 8 ----------
1002            (
1003                // orderings
1004                vec![
1005                    // [b+d ASC]
1006                    vec![(&b_plus_d, option_asc)],
1007                ],
1008                // proj exprs
1009                vec![
1010                    (col_b, "b_new".to_string()),
1011                    (col_a, "a_new".to_string()),
1012                    (col_d, "d_new".to_string()),
1013                    (&b_plus_d, "b+d".to_string()),
1014                ],
1015                // expected
1016                vec![
1017                    // [b+d ASC]
1018                    vec![("b+d", option_asc)],
1019                ],
1020            ),
1021            // ------- TEST CASE 9 ----------
1022            (
1023                // orderings
1024                vec![
1025                    // [a ASC, d ASC, b ASC]
1026                    vec![
1027                        (col_a, option_asc),
1028                        (col_d, option_asc),
1029                        (col_b, option_asc),
1030                    ],
1031                    // [c ASC]
1032                    vec![(col_c, option_asc)],
1033                ],
1034                // proj exprs
1035                vec![
1036                    (col_b, "b_new".to_string()),
1037                    (col_a, "a_new".to_string()),
1038                    (col_d, "d_new".to_string()),
1039                    (col_c, "c_new".to_string()),
1040                ],
1041                // expected
1042                vec![
1043                    // [a_new ASC, d_new ASC, b_new ASC]
1044                    vec![
1045                        ("a_new", option_asc),
1046                        ("d_new", option_asc),
1047                        ("b_new", option_asc),
1048                    ],
1049                    // [c_new ASC],
1050                    vec![("c_new", option_asc)],
1051                ],
1052            ),
1053            // ------- TEST CASE 10 ----------
1054            (
1055                vec![
1056                    // [a ASC, b ASC, c ASC]
1057                    vec![
1058                        (col_a, option_asc),
1059                        (col_b, option_asc),
1060                        (col_c, option_asc),
1061                    ],
1062                    // [a ASC, d ASC]
1063                    vec![(col_a, option_asc), (col_d, option_asc)],
1064                ],
1065                // proj exprs
1066                vec![
1067                    (col_b, "b_new".to_string()),
1068                    (col_a, "a_new".to_string()),
1069                    (col_c, "c_new".to_string()),
1070                    (&c_plus_d, "c+d".to_string()),
1071                ],
1072                // expected
1073                vec![
1074                    // [a_new ASC, b_new ASC, c_new ASC]
1075                    vec![
1076                        ("a_new", option_asc),
1077                        ("b_new", option_asc),
1078                        ("c_new", option_asc),
1079                    ],
1080                    // [a_new ASC, b_new ASC, c+d ASC]
1081                    vec![
1082                        ("a_new", option_asc),
1083                        ("b_new", option_asc),
1084                        ("c+d", option_asc),
1085                    ],
1086                ],
1087            ),
1088            // ------- TEST CASE 11 ----------
1089            (
1090                // orderings
1091                vec![
1092                    // [a ASC, b ASC]
1093                    vec![(col_a, option_asc), (col_b, option_asc)],
1094                    // [a ASC, d ASC]
1095                    vec![(col_a, option_asc), (col_d, option_asc)],
1096                ],
1097                // proj exprs
1098                vec![
1099                    (col_b, "b_new".to_string()),
1100                    (col_a, "a_new".to_string()),
1101                    (&b_plus_d, "b+d".to_string()),
1102                ],
1103                // expected
1104                vec![
1105                    // [a_new ASC, b_new ASC]
1106                    vec![("a_new", option_asc), ("b_new", option_asc)],
1107                    // [a_new ASC, b + d ASC]
1108                    vec![("a_new", option_asc), ("b+d", option_asc)],
1109                ],
1110            ),
1111            // ------- TEST CASE 12 ----------
1112            (
1113                // orderings
1114                vec![
1115                    // [a ASC, b ASC, c ASC]
1116                    vec![
1117                        (col_a, option_asc),
1118                        (col_b, option_asc),
1119                        (col_c, option_asc),
1120                    ],
1121                ],
1122                // proj exprs
1123                vec![(col_c, "c_new".to_string()), (col_a, "a_new".to_string())],
1124                // expected
1125                vec![
1126                    // [a_new ASC]
1127                    vec![("a_new", option_asc)],
1128                ],
1129            ),
1130            // ------- TEST CASE 13 ----------
1131            (
1132                // orderings
1133                vec![
1134                    // [a ASC, b ASC, c ASC]
1135                    vec![
1136                        (col_a, option_asc),
1137                        (col_b, option_asc),
1138                        (col_c, option_asc),
1139                    ],
1140                    // [a ASC, a + b ASC, c ASC]
1141                    vec![
1142                        (col_a, option_asc),
1143                        (&a_plus_b, option_asc),
1144                        (col_c, option_asc),
1145                    ],
1146                ],
1147                // proj exprs
1148                vec![
1149                    (col_c, "c_new".to_string()),
1150                    (col_b, "b_new".to_string()),
1151                    (col_a, "a_new".to_string()),
1152                    (&a_plus_b, "a+b".to_string()),
1153                ],
1154                // expected
1155                vec![
1156                    // [a_new ASC, b_new ASC, c_new ASC]
1157                    vec![
1158                        ("a_new", option_asc),
1159                        ("b_new", option_asc),
1160                        ("c_new", option_asc),
1161                    ],
1162                    // [a_new ASC, a+b ASC, c_new ASC]
1163                    vec![
1164                        ("a_new", option_asc),
1165                        ("a+b", option_asc),
1166                        ("c_new", option_asc),
1167                    ],
1168                ],
1169            ),
1170            // ------- TEST CASE 14 ----------
1171            (
1172                // orderings
1173                vec![
1174                    // [a ASC, b ASC]
1175                    vec![(col_a, option_asc), (col_b, option_asc)],
1176                    // [c ASC, b ASC]
1177                    vec![(col_c, option_asc), (col_b, option_asc)],
1178                    // [d ASC, e ASC]
1179                    vec![(col_d, option_asc), (col_e, option_asc)],
1180                ],
1181                // proj exprs
1182                vec![
1183                    (col_c, "c_new".to_string()),
1184                    (col_d, "d_new".to_string()),
1185                    (col_a, "a_new".to_string()),
1186                    (&b_plus_e, "b+e".to_string()),
1187                ],
1188                // expected
1189                vec![
1190                    // [a_new ASC, d_new ASC, b+e ASC]
1191                    vec![
1192                        ("a_new", option_asc),
1193                        ("d_new", option_asc),
1194                        ("b+e", option_asc),
1195                    ],
1196                    // [d_new ASC, a_new ASC, b+e ASC]
1197                    vec![
1198                        ("d_new", option_asc),
1199                        ("a_new", option_asc),
1200                        ("b+e", option_asc),
1201                    ],
1202                    // [c_new ASC, d_new ASC, b+e ASC]
1203                    vec![
1204                        ("c_new", option_asc),
1205                        ("d_new", option_asc),
1206                        ("b+e", option_asc),
1207                    ],
1208                    // [d_new ASC, c_new ASC, b+e ASC]
1209                    vec![
1210                        ("d_new", option_asc),
1211                        ("c_new", option_asc),
1212                        ("b+e", option_asc),
1213                    ],
1214                ],
1215            ),
1216            // ------- TEST CASE 15 ----------
1217            (
1218                // orderings
1219                vec![
1220                    // [a ASC, c ASC, b ASC]
1221                    vec![
1222                        (col_a, option_asc),
1223                        (col_c, option_asc),
1224                        (col_b, option_asc),
1225                    ],
1226                ],
1227                // proj exprs
1228                vec![
1229                    (col_c, "c_new".to_string()),
1230                    (col_a, "a_new".to_string()),
1231                    (&a_plus_b, "a+b".to_string()),
1232                ],
1233                // expected
1234                vec![
1235                    // [a_new ASC, d_new ASC, b+e ASC]
1236                    vec![
1237                        ("a_new", option_asc),
1238                        ("c_new", option_asc),
1239                        ("a+b", option_asc),
1240                    ],
1241                ],
1242            ),
1243            // ------- TEST CASE 16 ----------
1244            (
1245                // orderings
1246                vec![
1247                    // [a ASC, b ASC]
1248                    vec![(col_a, option_asc), (col_b, option_asc)],
1249                    // [c ASC, b DESC]
1250                    vec![(col_c, option_asc), (col_b, option_desc)],
1251                    // [e ASC]
1252                    vec![(col_e, option_asc)],
1253                ],
1254                // proj exprs
1255                vec![
1256                    (col_c, "c_new".to_string()),
1257                    (col_a, "a_new".to_string()),
1258                    (col_b, "b_new".to_string()),
1259                    (&b_plus_e, "b+e".to_string()),
1260                ],
1261                // expected
1262                vec![
1263                    // [a_new ASC, b_new ASC]
1264                    vec![("a_new", option_asc), ("b_new", option_asc)],
1265                    // [a_new ASC, b_new ASC]
1266                    vec![("a_new", option_asc), ("b+e", option_asc)],
1267                    // [c_new ASC, b_new DESC]
1268                    vec![("c_new", option_asc), ("b_new", option_desc)],
1269                ],
1270            ),
1271        ];
1272
1273        for (idx, (orderings, proj_exprs, expected)) in test_cases.into_iter().enumerate()
1274        {
1275            let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1276
1277            let orderings = convert_to_orderings(&orderings);
1278            eq_properties.add_orderings(orderings);
1279
1280            let proj_exprs = proj_exprs
1281                .into_iter()
1282                .map(|(expr, name)| (Arc::clone(expr), name));
1283            let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1284            let output_schema = output_schema(&projection_mapping, &schema)?;
1285
1286            let expected = expected
1287                .into_iter()
1288                .map(|ordering| {
1289                    ordering
1290                        .into_iter()
1291                        .map(|(name, options)| {
1292                            (col(name, &output_schema).unwrap(), options)
1293                        })
1294                        .collect::<Vec<_>>()
1295                })
1296                .collect::<Vec<_>>();
1297            let expected = convert_to_orderings(&expected);
1298
1299            let projected_eq = eq_properties.project(&projection_mapping, output_schema);
1300            let orderings = projected_eq.oeq_class();
1301
1302            let err_msg = format!(
1303                "test_idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1304            );
1305
1306            assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1307            for expected_ordering in &expected {
1308                assert!(orderings.contains(expected_ordering), "{}", err_msg)
1309            }
1310        }
1311
1312        Ok(())
1313    }
1314
1315    #[test]
1316    fn project_orderings2() -> Result<()> {
1317        let schema = Arc::new(Schema::new(vec![
1318            Field::new("a", DataType::Int32, true),
1319            Field::new("b", DataType::Int32, true),
1320            Field::new("c", DataType::Int32, true),
1321            Field::new("d", DataType::Int32, true),
1322            Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1323        ]));
1324        let col_a = &col("a", &schema)?;
1325        let col_b = &col("b", &schema)?;
1326        let col_c = &col("c", &schema)?;
1327        let col_ts = &col("ts", &schema)?;
1328        let a_plus_b = Arc::new(BinaryExpr::new(
1329            Arc::clone(col_a),
1330            Operator::Plus,
1331            Arc::clone(col_b),
1332        )) as Arc<dyn PhysicalExpr>;
1333
1334        let test_fun = Arc::new(ScalarUDF::new_from_impl(TestScalarUDF::new()));
1335
1336        let round_c = Arc::new(ScalarFunctionExpr::try_new(
1337            test_fun,
1338            vec![Arc::clone(col_c)],
1339            &schema,
1340            Arc::new(ConfigOptions::default()),
1341        )?) as PhysicalExprRef;
1342
1343        let option_asc = SortOptions {
1344            descending: false,
1345            nulls_first: false,
1346        };
1347
1348        let proj_exprs = vec![
1349            (col_b, "b_new".to_string()),
1350            (col_a, "a_new".to_string()),
1351            (col_c, "c_new".to_string()),
1352            (&round_c, "round_c_res".to_string()),
1353        ];
1354        let proj_exprs = proj_exprs
1355            .into_iter()
1356            .map(|(expr, name)| (Arc::clone(expr), name));
1357        let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1358        let output_schema = output_schema(&projection_mapping, &schema)?;
1359
1360        let col_a_new = &col("a_new", &output_schema)?;
1361        let col_b_new = &col("b_new", &output_schema)?;
1362        let col_c_new = &col("c_new", &output_schema)?;
1363        let col_round_c_res = &col("round_c_res", &output_schema)?;
1364        let a_new_plus_b_new = Arc::new(BinaryExpr::new(
1365            Arc::clone(col_a_new),
1366            Operator::Plus,
1367            Arc::clone(col_b_new),
1368        )) as Arc<dyn PhysicalExpr>;
1369
1370        let test_cases = [
1371            // ---------- TEST CASE 1 ------------
1372            (
1373                // orderings
1374                vec![
1375                    // [a ASC]
1376                    vec![(col_a, option_asc)],
1377                ],
1378                // expected
1379                vec![
1380                    // [b_new ASC]
1381                    vec![(col_a_new, option_asc)],
1382                ],
1383            ),
1384            // ---------- TEST CASE 2 ------------
1385            (
1386                // orderings
1387                vec![
1388                    // [a+b ASC]
1389                    vec![(&a_plus_b, option_asc)],
1390                ],
1391                // expected
1392                vec![
1393                    // [b_new ASC]
1394                    vec![(&a_new_plus_b_new, option_asc)],
1395                ],
1396            ),
1397            // ---------- TEST CASE 3 ------------
1398            (
1399                // orderings
1400                vec![
1401                    // [a ASC, ts ASC]
1402                    vec![(col_a, option_asc), (col_ts, option_asc)],
1403                ],
1404                // expected
1405                vec![
1406                    // [a_new ASC, date_bin_res ASC]
1407                    vec![(col_a_new, option_asc)],
1408                ],
1409            ),
1410            // ---------- TEST CASE 4 ------------
1411            (
1412                // orderings
1413                vec![
1414                    // [a ASC, ts ASC, b ASC]
1415                    vec![
1416                        (col_a, option_asc),
1417                        (col_ts, option_asc),
1418                        (col_b, option_asc),
1419                    ],
1420                ],
1421                // expected
1422                vec![
1423                    // [a_new ASC, date_bin_res ASC]
1424                    vec![(col_a_new, option_asc)],
1425                ],
1426            ),
1427            // ---------- TEST CASE 5 ------------
1428            (
1429                // orderings
1430                vec![
1431                    // [a ASC, c ASC]
1432                    vec![(col_a, option_asc), (col_c, option_asc)],
1433                ],
1434                // expected
1435                vec![
1436                    // [a_new ASC, round_c_res ASC, c_new ASC]
1437                    vec![(col_a_new, option_asc), (col_round_c_res, option_asc)],
1438                    // [a_new ASC, c_new ASC]
1439                    vec![(col_a_new, option_asc), (col_c_new, option_asc)],
1440                ],
1441            ),
1442            // ---------- TEST CASE 6 ------------
1443            (
1444                // orderings
1445                vec![
1446                    // [c ASC, b ASC]
1447                    vec![(col_c, option_asc), (col_b, option_asc)],
1448                ],
1449                // expected
1450                vec![
1451                    // [round_c_res ASC]
1452                    vec![(col_round_c_res, option_asc)],
1453                    // [c_new ASC, b_new ASC]
1454                    vec![(col_c_new, option_asc), (col_b_new, option_asc)],
1455                ],
1456            ),
1457            // ---------- TEST CASE 7 ------------
1458            (
1459                // orderings
1460                vec![
1461                    // [a+b ASC, c ASC]
1462                    vec![(&a_plus_b, option_asc), (col_c, option_asc)],
1463                ],
1464                // expected
1465                vec![
1466                    // [a+b ASC, round(c) ASC, c_new ASC]
1467                    vec![
1468                        (&a_new_plus_b_new, option_asc),
1469                        (col_round_c_res, option_asc),
1470                    ],
1471                    // [a+b ASC, c_new ASC]
1472                    vec![(&a_new_plus_b_new, option_asc), (col_c_new, option_asc)],
1473                ],
1474            ),
1475        ];
1476
1477        for (idx, (orderings, expected)) in test_cases.iter().enumerate() {
1478            let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1479
1480            let orderings = convert_to_orderings(orderings);
1481            eq_properties.add_orderings(orderings);
1482
1483            let expected = convert_to_orderings(expected);
1484
1485            let projected_eq =
1486                eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
1487            let orderings = projected_eq.oeq_class();
1488
1489            let err_msg = format!(
1490                "test idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1491            );
1492
1493            assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1494            for expected_ordering in &expected {
1495                assert!(orderings.contains(expected_ordering), "{}", err_msg)
1496            }
1497        }
1498        Ok(())
1499    }
1500
1501    #[test]
1502    fn project_orderings3() -> Result<()> {
1503        let schema = Arc::new(Schema::new(vec![
1504            Field::new("a", DataType::Int32, true),
1505            Field::new("b", DataType::Int32, true),
1506            Field::new("c", DataType::Int32, true),
1507            Field::new("d", DataType::Int32, true),
1508            Field::new("e", DataType::Int32, true),
1509            Field::new("f", DataType::Int32, true),
1510        ]));
1511        let col_a = &col("a", &schema)?;
1512        let col_b = &col("b", &schema)?;
1513        let col_c = &col("c", &schema)?;
1514        let col_d = &col("d", &schema)?;
1515        let col_e = &col("e", &schema)?;
1516        let col_f = &col("f", &schema)?;
1517        let a_plus_b = Arc::new(BinaryExpr::new(
1518            Arc::clone(col_a),
1519            Operator::Plus,
1520            Arc::clone(col_b),
1521        )) as Arc<dyn PhysicalExpr>;
1522
1523        let option_asc = SortOptions {
1524            descending: false,
1525            nulls_first: false,
1526        };
1527
1528        let proj_exprs = vec![
1529            (col_c, "c_new".to_string()),
1530            (col_d, "d_new".to_string()),
1531            (&a_plus_b, "a+b".to_string()),
1532        ];
1533        let proj_exprs = proj_exprs
1534            .into_iter()
1535            .map(|(expr, name)| (Arc::clone(expr), name));
1536        let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1537        let output_schema = output_schema(&projection_mapping, &schema)?;
1538
1539        let col_a_plus_b_new = &col("a+b", &output_schema)?;
1540        let col_c_new = &col("c_new", &output_schema)?;
1541        let col_d_new = &col("d_new", &output_schema)?;
1542
1543        let test_cases = vec![
1544            // ---------- TEST CASE 1 ------------
1545            (
1546                // orderings
1547                vec![
1548                    // [d ASC, b ASC]
1549                    vec![(col_d, option_asc), (col_b, option_asc)],
1550                    // [c ASC, a ASC]
1551                    vec![(col_c, option_asc), (col_a, option_asc)],
1552                ],
1553                // equal conditions
1554                vec![],
1555                // expected
1556                vec![
1557                    // [d_new ASC, c_new ASC, a+b ASC]
1558                    vec![
1559                        (col_d_new, option_asc),
1560                        (col_c_new, option_asc),
1561                        (col_a_plus_b_new, option_asc),
1562                    ],
1563                    // [c_new ASC, d_new ASC, a+b ASC]
1564                    vec![
1565                        (col_c_new, option_asc),
1566                        (col_d_new, option_asc),
1567                        (col_a_plus_b_new, option_asc),
1568                    ],
1569                ],
1570            ),
1571            // ---------- TEST CASE 2 ------------
1572            (
1573                // orderings
1574                vec![
1575                    // [d ASC, b ASC]
1576                    vec![(col_d, option_asc), (col_b, option_asc)],
1577                    // [c ASC, e ASC], Please note that a=e
1578                    vec![(col_c, option_asc), (col_e, option_asc)],
1579                ],
1580                // equal conditions
1581                vec![(col_e, col_a)],
1582                // expected
1583                vec![
1584                    // [d_new ASC, c_new ASC, a+b ASC]
1585                    vec![
1586                        (col_d_new, option_asc),
1587                        (col_c_new, option_asc),
1588                        (col_a_plus_b_new, option_asc),
1589                    ],
1590                    // [c_new ASC, d_new ASC, a+b ASC]
1591                    vec![
1592                        (col_c_new, option_asc),
1593                        (col_d_new, option_asc),
1594                        (col_a_plus_b_new, option_asc),
1595                    ],
1596                ],
1597            ),
1598            // ---------- TEST CASE 3 ------------
1599            (
1600                // orderings
1601                vec![
1602                    // [d ASC, b ASC]
1603                    vec![(col_d, option_asc), (col_b, option_asc)],
1604                    // [c ASC, e ASC], Please note that a=f
1605                    vec![(col_c, option_asc), (col_e, option_asc)],
1606                ],
1607                // equal conditions
1608                vec![(col_a, col_f)],
1609                // expected
1610                vec![
1611                    // [d_new ASC]
1612                    vec![(col_d_new, option_asc)],
1613                    // [c_new ASC]
1614                    vec![(col_c_new, option_asc)],
1615                ],
1616            ),
1617        ];
1618        for (orderings, equal_columns, expected) in test_cases {
1619            let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1620            for (lhs, rhs) in equal_columns {
1621                eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?;
1622            }
1623
1624            let orderings = convert_to_orderings(&orderings);
1625            eq_properties.add_orderings(orderings);
1626
1627            let expected = convert_to_orderings(&expected);
1628
1629            let projected_eq =
1630                eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
1631            let orderings = projected_eq.oeq_class();
1632
1633            let err_msg = format!(
1634                "actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1635            );
1636
1637            assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1638            for expected_ordering in &expected {
1639                assert!(orderings.contains(expected_ordering), "{}", err_msg)
1640            }
1641        }
1642
1643        Ok(())
1644    }
1645
1646    fn get_stats() -> Statistics {
1647        Statistics {
1648            num_rows: Precision::Exact(5),
1649            total_byte_size: Precision::Exact(23),
1650            column_statistics: vec![
1651                ColumnStatistics {
1652                    distinct_count: Precision::Exact(5),
1653                    max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
1654                    min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
1655                    sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1656                    null_count: Precision::Exact(0),
1657                },
1658                ColumnStatistics {
1659                    distinct_count: Precision::Exact(1),
1660                    max_value: Precision::Exact(ScalarValue::from("x")),
1661                    min_value: Precision::Exact(ScalarValue::from("a")),
1662                    sum_value: Precision::Absent,
1663                    null_count: Precision::Exact(3),
1664                },
1665                ColumnStatistics {
1666                    distinct_count: Precision::Absent,
1667                    max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
1668                    min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
1669                    sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
1670                    null_count: Precision::Absent,
1671                },
1672            ],
1673        }
1674    }
1675
1676    fn get_schema() -> Schema {
1677        let field_0 = Field::new("col0", DataType::Int64, false);
1678        let field_1 = Field::new("col1", DataType::Utf8, false);
1679        let field_2 = Field::new("col2", DataType::Float32, false);
1680        Schema::new(vec![field_0, field_1, field_2])
1681    }
1682
1683    #[test]
1684    fn test_stats_projection_columns_only() {
1685        let source = get_stats();
1686        let schema = get_schema();
1687
1688        let projection = ProjectionExprs::new(vec![
1689            ProjectionExpr {
1690                expr: Arc::new(Column::new("col1", 1)),
1691                alias: "col1".to_string(),
1692            },
1693            ProjectionExpr {
1694                expr: Arc::new(Column::new("col0", 0)),
1695                alias: "col0".to_string(),
1696            },
1697        ]);
1698
1699        let result = projection.project_statistics(source, &schema).unwrap();
1700
1701        let expected = Statistics {
1702            num_rows: Precision::Exact(5),
1703            total_byte_size: Precision::Exact(23),
1704            column_statistics: vec![
1705                ColumnStatistics {
1706                    distinct_count: Precision::Exact(1),
1707                    max_value: Precision::Exact(ScalarValue::from("x")),
1708                    min_value: Precision::Exact(ScalarValue::from("a")),
1709                    sum_value: Precision::Absent,
1710                    null_count: Precision::Exact(3),
1711                },
1712                ColumnStatistics {
1713                    distinct_count: Precision::Exact(5),
1714                    max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
1715                    min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
1716                    sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1717                    null_count: Precision::Exact(0),
1718                },
1719            ],
1720        };
1721
1722        assert_eq!(result, expected);
1723    }
1724
1725    #[test]
1726    fn test_stats_projection_column_with_primitive_width_only() {
1727        let source = get_stats();
1728        let schema = get_schema();
1729
1730        let projection = ProjectionExprs::new(vec![
1731            ProjectionExpr {
1732                expr: Arc::new(Column::new("col2", 2)),
1733                alias: "col2".to_string(),
1734            },
1735            ProjectionExpr {
1736                expr: Arc::new(Column::new("col0", 0)),
1737                alias: "col0".to_string(),
1738            },
1739        ]);
1740
1741        let result = projection.project_statistics(source, &schema).unwrap();
1742
1743        let expected = Statistics {
1744            num_rows: Precision::Exact(5),
1745            total_byte_size: Precision::Exact(60),
1746            column_statistics: vec![
1747                ColumnStatistics {
1748                    distinct_count: Precision::Absent,
1749                    max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
1750                    min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
1751                    sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
1752                    null_count: Precision::Absent,
1753                },
1754                ColumnStatistics {
1755                    distinct_count: Precision::Exact(5),
1756                    max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
1757                    min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
1758                    sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1759                    null_count: Precision::Exact(0),
1760                },
1761            ],
1762        };
1763
1764        assert_eq!(result, expected);
1765    }
1766
1767    // Tests for Projection struct
1768
1769    #[test]
1770    fn test_projection_new() -> Result<()> {
1771        let exprs = vec![
1772            ProjectionExpr {
1773                expr: Arc::new(Column::new("a", 0)),
1774                alias: "a".to_string(),
1775            },
1776            ProjectionExpr {
1777                expr: Arc::new(Column::new("b", 1)),
1778                alias: "b".to_string(),
1779            },
1780        ];
1781        let projection = ProjectionExprs::new(exprs.clone());
1782        assert_eq!(projection.as_ref().len(), 2);
1783        Ok(())
1784    }
1785
1786    #[test]
1787    fn test_projection_from_vec() -> Result<()> {
1788        let exprs = vec![ProjectionExpr {
1789            expr: Arc::new(Column::new("x", 0)),
1790            alias: "x".to_string(),
1791        }];
1792        let projection: ProjectionExprs = exprs.clone().into();
1793        assert_eq!(projection.as_ref().len(), 1);
1794        Ok(())
1795    }
1796
1797    #[test]
1798    fn test_projection_as_ref() -> Result<()> {
1799        let exprs = vec![
1800            ProjectionExpr {
1801                expr: Arc::new(Column::new("col1", 0)),
1802                alias: "col1".to_string(),
1803            },
1804            ProjectionExpr {
1805                expr: Arc::new(Column::new("col2", 1)),
1806                alias: "col2".to_string(),
1807            },
1808        ];
1809        let projection = ProjectionExprs::new(exprs);
1810        let as_ref: &[ProjectionExpr] = projection.as_ref();
1811        assert_eq!(as_ref.len(), 2);
1812        Ok(())
1813    }
1814
1815    #[test]
1816    fn test_column_indices_multiple_columns() -> Result<()> {
1817        // Test with reversed column order to ensure proper reordering
1818        let projection = ProjectionExprs::new(vec![
1819            ProjectionExpr {
1820                expr: Arc::new(Column::new("c", 5)),
1821                alias: "c".to_string(),
1822            },
1823            ProjectionExpr {
1824                expr: Arc::new(Column::new("b", 2)),
1825                alias: "b".to_string(),
1826            },
1827            ProjectionExpr {
1828                expr: Arc::new(Column::new("a", 0)),
1829                alias: "a".to_string(),
1830            },
1831        ]);
1832        // Should return sorted indices regardless of projection order
1833        assert_eq!(projection.column_indices(), vec![0, 2, 5]);
1834        Ok(())
1835    }
1836
1837    #[test]
1838    fn test_column_indices_duplicates() -> Result<()> {
1839        // Test that duplicate column indices appear only once
1840        let projection = ProjectionExprs::new(vec![
1841            ProjectionExpr {
1842                expr: Arc::new(Column::new("a", 1)),
1843                alias: "a".to_string(),
1844            },
1845            ProjectionExpr {
1846                expr: Arc::new(Column::new("b", 3)),
1847                alias: "b".to_string(),
1848            },
1849            ProjectionExpr {
1850                expr: Arc::new(Column::new("a2", 1)), // duplicate index
1851                alias: "a2".to_string(),
1852            },
1853        ]);
1854        assert_eq!(projection.column_indices(), vec![1, 3]);
1855        Ok(())
1856    }
1857
1858    #[test]
1859    fn test_column_indices_unsorted() -> Result<()> {
1860        // Test that column indices are sorted in the output
1861        let projection = ProjectionExprs::new(vec![
1862            ProjectionExpr {
1863                expr: Arc::new(Column::new("c", 5)),
1864                alias: "c".to_string(),
1865            },
1866            ProjectionExpr {
1867                expr: Arc::new(Column::new("a", 1)),
1868                alias: "a".to_string(),
1869            },
1870            ProjectionExpr {
1871                expr: Arc::new(Column::new("b", 3)),
1872                alias: "b".to_string(),
1873            },
1874        ]);
1875        assert_eq!(projection.column_indices(), vec![1, 3, 5]);
1876        Ok(())
1877    }
1878
1879    #[test]
1880    fn test_column_indices_complex_expr() -> Result<()> {
1881        // Test with complex expressions containing multiple columns
1882        let expr = Arc::new(BinaryExpr::new(
1883            Arc::new(Column::new("a", 1)),
1884            Operator::Plus,
1885            Arc::new(Column::new("b", 4)),
1886        ));
1887        let projection = ProjectionExprs::new(vec![
1888            ProjectionExpr {
1889                expr,
1890                alias: "sum".to_string(),
1891            },
1892            ProjectionExpr {
1893                expr: Arc::new(Column::new("c", 2)),
1894                alias: "c".to_string(),
1895            },
1896        ]);
1897        // Should return [1, 2, 4] - all columns used, sorted and deduplicated
1898        assert_eq!(projection.column_indices(), vec![1, 2, 4]);
1899        Ok(())
1900    }
1901
1902    #[test]
1903    fn test_column_indices_empty() -> Result<()> {
1904        let projection = ProjectionExprs::new(vec![]);
1905        assert_eq!(projection.column_indices(), Vec::<usize>::new());
1906        Ok(())
1907    }
1908
1909    #[test]
1910    fn test_merge_simple_columns() -> Result<()> {
1911        // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
1912        let base_projection = ProjectionExprs::new(vec![
1913            ProjectionExpr {
1914                expr: Arc::new(Column::new("c", 2)),
1915                alias: "x".to_string(),
1916            },
1917            ProjectionExpr {
1918                expr: Arc::new(Column::new("b", 1)),
1919                alias: "y".to_string(),
1920            },
1921            ProjectionExpr {
1922                expr: Arc::new(Column::new("a", 0)),
1923                alias: "z".to_string(),
1924            },
1925        ]);
1926
1927        // Second projection: SELECT y@1 AS col2, x@0 AS col1
1928        let top_projection = ProjectionExprs::new(vec![
1929            ProjectionExpr {
1930                expr: Arc::new(Column::new("y", 1)),
1931                alias: "col2".to_string(),
1932            },
1933            ProjectionExpr {
1934                expr: Arc::new(Column::new("x", 0)),
1935                alias: "col1".to_string(),
1936            },
1937        ]);
1938
1939        // Merge should produce: SELECT b@1 AS col2, c@2 AS col1
1940        let merged = base_projection.try_merge(&top_projection)?;
1941        assert_snapshot!(format!("{merged}"), @"Projection[b@1 AS col2, c@2 AS col1]");
1942
1943        Ok(())
1944    }
1945
1946    #[test]
1947    fn test_merge_with_expressions() -> Result<()> {
1948        // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
1949        let base_projection = ProjectionExprs::new(vec![
1950            ProjectionExpr {
1951                expr: Arc::new(Column::new("c", 2)),
1952                alias: "x".to_string(),
1953            },
1954            ProjectionExpr {
1955                expr: Arc::new(Column::new("b", 1)),
1956                alias: "y".to_string(),
1957            },
1958            ProjectionExpr {
1959                expr: Arc::new(Column::new("a", 0)),
1960                alias: "z".to_string(),
1961            },
1962        ]);
1963
1964        // Second projection: SELECT y@1 + z@2 AS c2, x@0 + 1 AS c1
1965        let top_projection = ProjectionExprs::new(vec![
1966            ProjectionExpr {
1967                expr: Arc::new(BinaryExpr::new(
1968                    Arc::new(Column::new("y", 1)),
1969                    Operator::Plus,
1970                    Arc::new(Column::new("z", 2)),
1971                )),
1972                alias: "c2".to_string(),
1973            },
1974            ProjectionExpr {
1975                expr: Arc::new(BinaryExpr::new(
1976                    Arc::new(Column::new("x", 0)),
1977                    Operator::Plus,
1978                    Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1979                )),
1980                alias: "c1".to_string(),
1981            },
1982        ]);
1983
1984        // Merge should produce: SELECT b@1 + a@0 AS c2, c@2 + 1 AS c1
1985        let merged = base_projection.try_merge(&top_projection)?;
1986        assert_snapshot!(format!("{merged}"), @"Projection[b@1 + a@0 AS c2, c@2 + 1 AS c1]");
1987
1988        Ok(())
1989    }
1990
1991    #[test]
1992    fn try_merge_error() {
1993        // Create a base projection
1994        let base = ProjectionExprs::new(vec![
1995            ProjectionExpr {
1996                expr: Arc::new(Column::new("a", 0)),
1997                alias: "x".to_string(),
1998            },
1999            ProjectionExpr {
2000                expr: Arc::new(Column::new("b", 1)),
2001                alias: "y".to_string(),
2002            },
2003        ]);
2004
2005        // Create a top projection that references a non-existent column index
2006        let top = ProjectionExprs::new(vec![ProjectionExpr {
2007            expr: Arc::new(Column::new("z", 5)), // Invalid index
2008            alias: "result".to_string(),
2009        }]);
2010
2011        // Attempt to merge and expect an error
2012        let err_msg = base.try_merge(&top).unwrap_err().to_string();
2013        assert!(
2014            err_msg.contains("Internal error: Column index 5 out of bounds for projected expressions of length 2"),
2015            "Unexpected error message: {err_msg}",
2016        );
2017    }
2018
2019    #[test]
2020    fn test_project_schema_simple_columns() -> Result<()> {
2021        // Input schema: [col0: Int64, col1: Utf8, col2: Float32]
2022        let input_schema = get_schema();
2023
2024        // Projection: SELECT col2 AS c, col0 AS a
2025        let projection = ProjectionExprs::new(vec![
2026            ProjectionExpr {
2027                expr: Arc::new(Column::new("col2", 2)),
2028                alias: "c".to_string(),
2029            },
2030            ProjectionExpr {
2031                expr: Arc::new(Column::new("col0", 0)),
2032                alias: "a".to_string(),
2033            },
2034        ]);
2035
2036        let output_schema = projection.project_schema(&input_schema)?;
2037
2038        // Should have 2 fields
2039        assert_eq!(output_schema.fields().len(), 2);
2040
2041        // First field should be "c" with Float32 type
2042        assert_eq!(output_schema.field(0).name(), "c");
2043        assert_eq!(output_schema.field(0).data_type(), &DataType::Float32);
2044
2045        // Second field should be "a" with Int64 type
2046        assert_eq!(output_schema.field(1).name(), "a");
2047        assert_eq!(output_schema.field(1).data_type(), &DataType::Int64);
2048
2049        Ok(())
2050    }
2051
2052    #[test]
2053    fn test_project_schema_with_expressions() -> Result<()> {
2054        // Input schema: [col0: Int64, col1: Utf8, col2: Float32]
2055        let input_schema = get_schema();
2056
2057        // Projection: SELECT col0 + 1 AS incremented
2058        let projection = ProjectionExprs::new(vec![ProjectionExpr {
2059            expr: Arc::new(BinaryExpr::new(
2060                Arc::new(Column::new("col0", 0)),
2061                Operator::Plus,
2062                Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2063            )),
2064            alias: "incremented".to_string(),
2065        }]);
2066
2067        let output_schema = projection.project_schema(&input_schema)?;
2068
2069        // Should have 1 field
2070        assert_eq!(output_schema.fields().len(), 1);
2071
2072        // Field should be "incremented" with Int64 type
2073        assert_eq!(output_schema.field(0).name(), "incremented");
2074        assert_eq!(output_schema.field(0).data_type(), &DataType::Int64);
2075
2076        Ok(())
2077    }
2078
2079    #[test]
2080    fn test_project_schema_preserves_metadata() -> Result<()> {
2081        // Create schema with metadata
2082        let mut metadata = HashMap::new();
2083        metadata.insert("key".to_string(), "value".to_string());
2084        let field_with_metadata =
2085            Field::new("col0", DataType::Int64, false).with_metadata(metadata.clone());
2086        let input_schema = Schema::new(vec![
2087            field_with_metadata,
2088            Field::new("col1", DataType::Utf8, false),
2089        ]);
2090
2091        // Projection: SELECT col0 AS renamed
2092        let projection = ProjectionExprs::new(vec![ProjectionExpr {
2093            expr: Arc::new(Column::new("col0", 0)),
2094            alias: "renamed".to_string(),
2095        }]);
2096
2097        let output_schema = projection.project_schema(&input_schema)?;
2098
2099        // Should have 1 field
2100        assert_eq!(output_schema.fields().len(), 1);
2101
2102        // Field should be "renamed" with metadata preserved
2103        assert_eq!(output_schema.field(0).name(), "renamed");
2104        assert_eq!(output_schema.field(0).metadata(), &metadata);
2105
2106        Ok(())
2107    }
2108
2109    #[test]
2110    fn test_project_schema_empty() -> Result<()> {
2111        let input_schema = get_schema();
2112        let projection = ProjectionExprs::new(vec![]);
2113
2114        let output_schema = projection.project_schema(&input_schema)?;
2115
2116        assert_eq!(output_schema.fields().len(), 0);
2117
2118        Ok(())
2119    }
2120
2121    #[test]
2122    fn test_project_statistics_columns_only() -> Result<()> {
2123        let input_stats = get_stats();
2124        let input_schema = get_schema();
2125
2126        // Projection: SELECT col1 AS text, col0 AS num
2127        let projection = ProjectionExprs::new(vec![
2128            ProjectionExpr {
2129                expr: Arc::new(Column::new("col1", 1)),
2130                alias: "text".to_string(),
2131            },
2132            ProjectionExpr {
2133                expr: Arc::new(Column::new("col0", 0)),
2134                alias: "num".to_string(),
2135            },
2136        ]);
2137
2138        let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2139
2140        // Row count should be preserved
2141        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2142
2143        // Should have 2 column statistics (reordered from input)
2144        assert_eq!(output_stats.column_statistics.len(), 2);
2145
2146        // First column (col1 from input)
2147        assert_eq!(
2148            output_stats.column_statistics[0].distinct_count,
2149            Precision::Exact(1)
2150        );
2151        assert_eq!(
2152            output_stats.column_statistics[0].max_value,
2153            Precision::Exact(ScalarValue::from("x"))
2154        );
2155
2156        // Second column (col0 from input)
2157        assert_eq!(
2158            output_stats.column_statistics[1].distinct_count,
2159            Precision::Exact(5)
2160        );
2161        assert_eq!(
2162            output_stats.column_statistics[1].max_value,
2163            Precision::Exact(ScalarValue::Int64(Some(21)))
2164        );
2165
2166        Ok(())
2167    }
2168
2169    #[test]
2170    fn test_project_statistics_with_expressions() -> Result<()> {
2171        let input_stats = get_stats();
2172        let input_schema = get_schema();
2173
2174        // Projection with expression: SELECT col0 + 1 AS incremented, col1 AS text
2175        let projection = ProjectionExprs::new(vec![
2176            ProjectionExpr {
2177                expr: Arc::new(BinaryExpr::new(
2178                    Arc::new(Column::new("col0", 0)),
2179                    Operator::Plus,
2180                    Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2181                )),
2182                alias: "incremented".to_string(),
2183            },
2184            ProjectionExpr {
2185                expr: Arc::new(Column::new("col1", 1)),
2186                alias: "text".to_string(),
2187            },
2188        ]);
2189
2190        let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2191
2192        // Row count should be preserved
2193        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2194
2195        // Should have 2 column statistics
2196        assert_eq!(output_stats.column_statistics.len(), 2);
2197
2198        // First column (expression) should have unknown statistics
2199        assert_eq!(
2200            output_stats.column_statistics[0].distinct_count,
2201            Precision::Absent
2202        );
2203        assert_eq!(
2204            output_stats.column_statistics[0].max_value,
2205            Precision::Absent
2206        );
2207
2208        // Second column (col1) should preserve statistics
2209        assert_eq!(
2210            output_stats.column_statistics[1].distinct_count,
2211            Precision::Exact(1)
2212        );
2213
2214        Ok(())
2215    }
2216
2217    #[test]
2218    fn test_project_statistics_primitive_width_only() -> Result<()> {
2219        let input_stats = get_stats();
2220        let input_schema = get_schema();
2221
2222        // Projection with only primitive width columns: SELECT col2 AS f, col0 AS i
2223        let projection = ProjectionExprs::new(vec![
2224            ProjectionExpr {
2225                expr: Arc::new(Column::new("col2", 2)),
2226                alias: "f".to_string(),
2227            },
2228            ProjectionExpr {
2229                expr: Arc::new(Column::new("col0", 0)),
2230                alias: "i".to_string(),
2231            },
2232        ]);
2233
2234        let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2235
2236        // Row count should be preserved
2237        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2238
2239        // Total byte size should be recalculated for primitive types
2240        // Float32 (4 bytes) + Int64 (8 bytes) = 12 bytes per row, 5 rows = 60 bytes
2241        assert_eq!(output_stats.total_byte_size, Precision::Exact(60));
2242
2243        // Should have 2 column statistics
2244        assert_eq!(output_stats.column_statistics.len(), 2);
2245
2246        Ok(())
2247    }
2248
2249    #[test]
2250    fn test_project_statistics_empty() -> Result<()> {
2251        let input_stats = get_stats();
2252        let input_schema = get_schema();
2253
2254        let projection = ProjectionExprs::new(vec![]);
2255
2256        let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2257
2258        // Row count should be preserved
2259        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2260
2261        // Should have no column statistics
2262        assert_eq!(output_stats.column_statistics.len(), 0);
2263
2264        // Total byte size should be 0 for empty projection
2265        assert_eq!(output_stats.total_byte_size, Precision::Exact(0));
2266
2267        Ok(())
2268    }
2269}