datafusion_physical_expr/equivalence/properties/
dependency.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Display};
19use std::ops::{Deref, DerefMut};
20use std::sync::Arc;
21
22use super::expr_refers;
23use crate::{LexOrdering, PhysicalSortExpr};
24
25use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
26
27use indexmap::{IndexMap, IndexSet};
28use itertools::Itertools;
29
30// A list of sort expressions that can be calculated from a known set of
31/// dependencies.
32#[derive(Debug, Default, Clone, PartialEq, Eq)]
33pub struct Dependencies {
34    sort_exprs: IndexSet<PhysicalSortExpr>,
35}
36
37impl Display for Dependencies {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        write!(f, "[")?;
40        let mut iter = self.sort_exprs.iter();
41        if let Some(dep) = iter.next() {
42            write!(f, "{dep}")?;
43        }
44        for dep in iter {
45            write!(f, ", {dep}")?;
46        }
47        write!(f, "]")
48    }
49}
50
51impl Dependencies {
52    // Creates a new `Dependencies` instance from the given sort expressions.
53    pub fn new(sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>) -> Self {
54        Self {
55            sort_exprs: sort_exprs.into_iter().collect(),
56        }
57    }
58}
59
60impl Deref for Dependencies {
61    type Target = IndexSet<PhysicalSortExpr>;
62
63    fn deref(&self) -> &Self::Target {
64        &self.sort_exprs
65    }
66}
67
68impl DerefMut for Dependencies {
69    fn deref_mut(&mut self) -> &mut Self::Target {
70        &mut self.sort_exprs
71    }
72}
73
74impl IntoIterator for Dependencies {
75    type Item = PhysicalSortExpr;
76    type IntoIter = <IndexSet<PhysicalSortExpr> as IntoIterator>::IntoIter;
77
78    fn into_iter(self) -> Self::IntoIter {
79        self.sort_exprs.into_iter()
80    }
81}
82
83/// Contains a mapping of all dependencies we have processed for each sort expr
84pub struct DependencyEnumerator<'a> {
85    /// Maps `expr` --> `[exprs]` that have previously been processed
86    seen: IndexMap<&'a PhysicalSortExpr, IndexSet<&'a PhysicalSortExpr>>,
87}
88
89impl<'a> DependencyEnumerator<'a> {
90    pub fn new() -> Self {
91        Self {
92            seen: IndexMap::new(),
93        }
94    }
95
96    /// Insert a new dependency,
97    ///
98    /// returns false if the dependency was already in the map
99    /// returns true if the dependency was newly inserted
100    fn insert(
101        &mut self,
102        target: &'a PhysicalSortExpr,
103        dep: &'a PhysicalSortExpr,
104    ) -> bool {
105        self.seen.entry(target).or_default().insert(dep)
106    }
107
108    /// This function recursively analyzes the dependencies of the given sort
109    /// expression within the given dependency map to construct lexicographical
110    /// orderings that include the sort expression and its dependencies.
111    ///
112    /// # Parameters
113    ///
114    /// - `referred_sort_expr`: A reference to the sort expression (`PhysicalSortExpr`)
115    ///   for which lexicographical orderings satisfying its dependencies are to be
116    ///   constructed.
117    /// - `dependency_map`: A reference to the `DependencyMap` that contains
118    ///   dependencies for different `PhysicalSortExpr`s.
119    ///
120    /// # Returns
121    ///
122    /// A vector of lexicographical orderings (`Vec<LexOrdering>`) based on the given
123    /// sort expression and its dependencies.
124    pub fn construct_orderings(
125        &mut self,
126        referred_sort_expr: &'a PhysicalSortExpr,
127        dependency_map: &'a DependencyMap,
128    ) -> Vec<LexOrdering> {
129        let node = dependency_map
130            .get(referred_sort_expr)
131            .expect("`referred_sort_expr` should be inside `dependency_map`");
132        // Since we work on intermediate nodes, we are sure `node.target` exists.
133        let target = node.target.as_ref().unwrap();
134        // An empty dependency means the referred_sort_expr represents a global ordering.
135        // Return its projected version, which is the target_expression.
136        if node.dependencies.is_empty() {
137            return vec![[target.clone()].into()];
138        };
139
140        node.dependencies
141            .iter()
142            .flat_map(|dep| {
143                let mut orderings = if self.insert(target, dep) {
144                    self.construct_orderings(dep, dependency_map)
145                } else {
146                    vec![]
147                };
148
149                for ordering in orderings.iter_mut() {
150                    ordering.push(target.clone());
151                }
152                orderings
153            })
154            .collect()
155    }
156}
157
158/// Maps an expression --> DependencyNode
159///
160/// # Debugging / deplaying `DependencyMap`
161///
162/// This structure implements `Display` to assist debugging. For example:
163///
164/// ```text
165/// DependencyMap: {
166///   a@0 ASC --> (target: a@0 ASC, dependencies: [[]])
167///   b@1 ASC --> (target: b@1 ASC, dependencies: [[a@0 ASC, c@2 ASC]])
168///   c@2 ASC --> (target: c@2 ASC, dependencies: [[b@1 ASC, a@0 ASC]])
169///   d@3 ASC --> (target: d@3 ASC, dependencies: [[c@2 ASC, b@1 ASC]])
170/// }
171/// ```
172///
173/// # Note on IndexMap Rationale
174///
175/// Using `IndexMap` (which preserves insert order) to ensure consistent results
176/// across different executions for the same query. We could have used `HashSet`
177/// and `HashMap` instead without any loss of functionality.
178///
179/// As an example, if existing orderings are
180/// 1. `[a ASC, b ASC]`
181/// 2. `[c ASC]`
182///
183/// Then both the following output orderings are valid
184/// 1. `[a ASC, b ASC, c ASC]`
185/// 2. `[c ASC, a ASC, b ASC]`
186///
187/// These are both valid as they are concatenated versions of the alternative
188/// orderings. Had we used `HashSet`/`HashMap`, we couldn't guarantee to generate
189/// the same result among the possible two results in the example above.
190#[derive(Debug, Default)]
191pub struct DependencyMap {
192    map: IndexMap<PhysicalSortExpr, DependencyNode>,
193}
194
195impl DependencyMap {
196    /// Insert a new dependency of `sort_expr` (i.e. `dependency`) into the map
197    /// along with its target sort expression.
198    pub fn insert(
199        &mut self,
200        sort_expr: PhysicalSortExpr,
201        target_sort_expr: Option<PhysicalSortExpr>,
202        dependency: Option<PhysicalSortExpr>,
203    ) {
204        let entry = self.map.entry(sort_expr);
205        let node = entry.or_insert_with(|| DependencyNode {
206            target: target_sort_expr,
207            dependencies: Dependencies::default(),
208        });
209        node.dependencies.extend(dependency);
210    }
211}
212
213impl Deref for DependencyMap {
214    type Target = IndexMap<PhysicalSortExpr, DependencyNode>;
215
216    fn deref(&self) -> &Self::Target {
217        &self.map
218    }
219}
220
221impl Display for DependencyMap {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        writeln!(f, "DependencyMap: {{")?;
224        for (sort_expr, node) in self.map.iter() {
225            writeln!(f, "  {sort_expr} --> {node}")?;
226        }
227        writeln!(f, "}}")
228    }
229}
230
231/// Represents a node in the dependency map used to construct projected orderings.
232///
233/// A `DependencyNode` contains information about a particular sort expression,
234/// including its target sort expression and a set of dependencies on other sort
235/// expressions.
236///
237/// # Fields
238///
239/// - `target`: An optional `PhysicalSortExpr` representing the target sort
240///   expression associated with the node. It is `None` if the sort expression
241///   cannot be projected.
242/// - `dependencies`: A [`Dependencies`] containing dependencies on other sort
243///   expressions that are referred to by the target sort expression.
244#[derive(Debug, Clone, PartialEq, Eq)]
245pub struct DependencyNode {
246    pub(crate) target: Option<PhysicalSortExpr>,
247    pub(crate) dependencies: Dependencies,
248}
249
250impl Display for DependencyNode {
251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252        if let Some(target) = &self.target {
253            write!(f, "(target: {target}, ")?;
254        } else {
255            write!(f, "(")?;
256        }
257        write!(f, "dependencies: [{}])", self.dependencies)
258    }
259}
260
261/// This function analyzes the dependency map to collect referred dependencies for
262/// a given source expression.
263///
264/// # Parameters
265///
266/// - `dependency_map`: A reference to the `DependencyMap` where each
267///   `PhysicalSortExpr` is associated with a `DependencyNode`.
268/// - `source`: A reference to the source expression (`Arc<dyn PhysicalExpr>`)
269///   for which relevant dependencies need to be identified.
270///
271/// # Returns
272///
273/// A `Vec<Dependencies>` containing the dependencies for the given source
274/// expression. These dependencies are expressions that are referred to by
275/// the source expression based on the provided dependency map.
276pub fn referred_dependencies(
277    dependency_map: &DependencyMap,
278    source: &Arc<dyn PhysicalExpr>,
279) -> Vec<Dependencies> {
280    // Associate `PhysicalExpr`s with `PhysicalSortExpr`s that contain them:
281    let mut expr_to_sort_exprs = IndexMap::<_, Dependencies>::new();
282    for sort_expr in dependency_map
283        .keys()
284        .filter(|sort_expr| expr_refers(source, &sort_expr.expr))
285    {
286        let key = Arc::clone(&sort_expr.expr);
287        expr_to_sort_exprs
288            .entry(key)
289            .or_default()
290            .insert(sort_expr.clone());
291    }
292
293    // Generate all valid dependencies for the source. For example, if the source
294    // is `a + b` and the map is `[a -> (a ASC, a DESC), b -> (b ASC)]`, we get
295    // `vec![HashSet(a ASC, b ASC), HashSet(a DESC, b ASC)]`.
296    expr_to_sort_exprs
297        .into_values()
298        .multi_cartesian_product()
299        .map(Dependencies::new)
300        .collect()
301}
302
303/// This function retrieves the dependencies of the given relevant sort expression
304/// from the given dependency map. It then constructs prefix orderings by recursively
305/// analyzing the dependencies and include them in the orderings.
306///
307/// # Parameters
308///
309/// - `relevant_sort_expr`: A reference to the relevant sort expression
310///   (`PhysicalSortExpr`) for which prefix orderings are to be constructed.
311/// - `dependency_map`: A reference to the `DependencyMap` containing dependencies.
312///
313/// # Returns
314///
315/// A vector of prefix orderings (`Vec<LexOrdering>`) based on the given relevant
316/// sort expression and its dependencies.
317pub fn construct_prefix_orderings(
318    relevant_sort_expr: &PhysicalSortExpr,
319    dependency_map: &DependencyMap,
320) -> Vec<LexOrdering> {
321    let mut dep_enumerator = DependencyEnumerator::new();
322    dependency_map
323        .get(relevant_sort_expr)
324        .expect("no relevant sort expr found")
325        .dependencies
326        .iter()
327        .flat_map(|dep| dep_enumerator.construct_orderings(dep, dependency_map))
328        .collect()
329}
330
331/// Generates all possible orderings where dependencies are satisfied for the
332/// current projection expression.
333///
334/// # Example
335///  If `dependencies` is `a + b ASC` and the dependency map holds dependencies
336///  * `a ASC` --> `[c ASC]`
337///  * `b ASC` --> `[d DESC]`,
338///
339/// This function generates these two sort orders
340/// * `[c ASC, d DESC, a + b ASC]`
341/// * `[d DESC, c ASC, a + b ASC]`
342///
343/// # Parameters
344///
345/// * `dependencies` - Set of relevant expressions.
346/// * `dependency_map` - Map of dependencies for expressions that may appear in
347///   `dependencies`.
348///
349/// # Returns
350///
351/// A vector of lexical orderings (`Vec<LexOrdering>`) representing all valid
352/// orderings based on the given dependencies.
353pub fn generate_dependency_orderings(
354    dependencies: &Dependencies,
355    dependency_map: &DependencyMap,
356) -> Vec<LexOrdering> {
357    // Construct all the valid prefix orderings for each expression appearing
358    // in the projection. Note that if relevant prefixes are empty, there is no
359    // dependency, meaning that dependent is a leading ordering.
360    dependencies
361        .iter()
362        .filter_map(|dep| {
363            let prefixes = construct_prefix_orderings(dep, dependency_map);
364            (!prefixes.is_empty()).then_some(prefixes)
365        })
366        // Generate all possible valid orderings:
367        .multi_cartesian_product()
368        .flat_map(|prefix_orderings| {
369            let length = prefix_orderings.len();
370            prefix_orderings
371                .into_iter()
372                .permutations(length)
373                .filter_map(|prefixes| {
374                    prefixes.into_iter().reduce(|mut acc, ordering| {
375                        acc.extend(ordering);
376                        acc
377                    })
378                })
379        })
380        .collect()
381}
382
383#[cfg(test)]
384mod tests {
385    use std::ops::Not;
386    use std::sync::Arc;
387
388    use super::*;
389    use crate::equivalence::tests::{
390        convert_to_sort_reqs, create_test_params, create_test_schema, parse_sort_expr,
391    };
392    use crate::equivalence::{convert_to_sort_exprs, ProjectionMapping};
393    use crate::expressions::{col, BinaryExpr, CastExpr, Column};
394    use crate::projection::tests::output_schema;
395    use crate::{ConstExpr, EquivalenceProperties, ScalarFunctionExpr};
396
397    use arrow::compute::SortOptions;
398    use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
399    use datafusion_common::config::ConfigOptions;
400    use datafusion_common::{Constraint, Constraints, Result};
401    use datafusion_expr::sort_properties::SortProperties;
402    use datafusion_expr::Operator;
403    use datafusion_functions::string::concat;
404    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
405    use datafusion_physical_expr_common::sort_expr::{
406        LexRequirement, PhysicalSortRequirement,
407    };
408
409    #[test]
410    fn project_equivalence_properties_test() -> Result<()> {
411        let input_schema = Arc::new(Schema::new(vec![
412            Field::new("a", DataType::Int64, true),
413            Field::new("b", DataType::Int64, true),
414            Field::new("c", DataType::Int64, true),
415        ]));
416
417        let input_properties = EquivalenceProperties::new(Arc::clone(&input_schema));
418        let col_a = col("a", &input_schema)?;
419
420        // a as a1, a as a2, a as a3, a as a3
421        let proj_exprs = vec![
422            (Arc::clone(&col_a), "a1".to_string()),
423            (Arc::clone(&col_a), "a2".to_string()),
424            (Arc::clone(&col_a), "a3".to_string()),
425            (Arc::clone(&col_a), "a4".to_string()),
426        ];
427        let projection_mapping = ProjectionMapping::try_new(proj_exprs, &input_schema)?;
428
429        let out_schema = output_schema(&projection_mapping, &input_schema)?;
430        // a as a1, a as a2, a as a3, a as a3
431        let proj_exprs = vec![
432            (Arc::clone(&col_a), "a1".to_string()),
433            (Arc::clone(&col_a), "a2".to_string()),
434            (Arc::clone(&col_a), "a3".to_string()),
435            (Arc::clone(&col_a), "a4".to_string()),
436        ];
437        let projection_mapping = ProjectionMapping::try_new(proj_exprs, &input_schema)?;
438
439        // a as a1, a as a2, a as a3, a as a3
440        let col_a1 = &col("a1", &out_schema)?;
441        let col_a2 = &col("a2", &out_schema)?;
442        let col_a3 = &col("a3", &out_schema)?;
443        let col_a4 = &col("a4", &out_schema)?;
444        let out_properties = input_properties.project(&projection_mapping, out_schema);
445
446        // At the output a1=a2=a3=a4
447        assert_eq!(out_properties.eq_group().len(), 1);
448        let eq_class = out_properties.eq_group().iter().next().unwrap();
449        assert_eq!(eq_class.len(), 4);
450        assert!(eq_class.contains(col_a1));
451        assert!(eq_class.contains(col_a2));
452        assert!(eq_class.contains(col_a3));
453        assert!(eq_class.contains(col_a4));
454
455        Ok(())
456    }
457
458    #[test]
459    fn project_equivalence_properties_test_multi() -> Result<()> {
460        // test multiple input orderings with equivalence properties
461        let input_schema = Arc::new(Schema::new(vec![
462            Field::new("a", DataType::Int64, true),
463            Field::new("b", DataType::Int64, true),
464            Field::new("c", DataType::Int64, true),
465            Field::new("d", DataType::Int64, true),
466        ]));
467
468        let mut input_properties = EquivalenceProperties::new(Arc::clone(&input_schema));
469        // add equivalent ordering [a, b, c, d]
470        input_properties.add_ordering([
471            parse_sort_expr("a", &input_schema),
472            parse_sort_expr("b", &input_schema),
473            parse_sort_expr("c", &input_schema),
474            parse_sort_expr("d", &input_schema),
475        ]);
476
477        // add equivalent ordering [a, c, b, d]
478        input_properties.add_ordering([
479            parse_sort_expr("a", &input_schema),
480            parse_sort_expr("c", &input_schema),
481            parse_sort_expr("b", &input_schema), // NB b and c are swapped
482            parse_sort_expr("d", &input_schema),
483        ]);
484
485        // simply project all the columns in order
486        let proj_exprs = vec![
487            (col("a", &input_schema)?, "a".to_string()),
488            (col("b", &input_schema)?, "b".to_string()),
489            (col("c", &input_schema)?, "c".to_string()),
490            (col("d", &input_schema)?, "d".to_string()),
491        ];
492        let projection_mapping = ProjectionMapping::try_new(proj_exprs, &input_schema)?;
493        let out_properties = input_properties.project(&projection_mapping, input_schema);
494
495        assert_eq!(
496            out_properties.to_string(),
497            "order: [[a@0 ASC, c@2 ASC, b@1 ASC, d@3 ASC], [a@0 ASC, b@1 ASC, c@2 ASC, d@3 ASC]]"
498        );
499
500        Ok(())
501    }
502
503    #[test]
504    fn test_normalize_ordering_equivalence_classes() -> Result<()> {
505        let schema = Schema::new(vec![
506            Field::new("a", DataType::Int32, true),
507            Field::new("b", DataType::Int32, true),
508            Field::new("c", DataType::Int32, true),
509        ]);
510        let col_a_expr = col("a", &schema)?;
511        let col_b_expr = col("b", &schema)?;
512        let col_c_expr = col("c", &schema)?;
513        let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
514
515        eq_properties.add_equal_conditions(col_a_expr, Arc::clone(&col_c_expr))?;
516        eq_properties.add_orderings([
517            vec![PhysicalSortExpr::new_default(Arc::clone(&col_b_expr))],
518            vec![PhysicalSortExpr::new_default(Arc::clone(&col_c_expr))],
519        ]);
520
521        let mut expected_eqs = EquivalenceProperties::new(Arc::new(schema));
522        expected_eqs.add_orderings([
523            vec![PhysicalSortExpr::new_default(col_b_expr)],
524            vec![PhysicalSortExpr::new_default(col_c_expr)],
525        ]);
526
527        assert!(eq_properties.oeq_class().eq(expected_eqs.oeq_class()));
528        Ok(())
529    }
530
531    #[test]
532    fn test_get_indices_of_matching_sort_exprs_with_order_eq() -> Result<()> {
533        let sort_options = SortOptions::default();
534        let sort_options_not = SortOptions::default().not();
535
536        let schema = Schema::new(vec![
537            Field::new("a", DataType::Int32, true),
538            Field::new("b", DataType::Int32, true),
539        ]);
540        let col_a = col("a", &schema)?;
541        let col_b = col("b", &schema)?;
542        let required_columns = [Arc::clone(&col_b), Arc::clone(&col_a)];
543        let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
544        eq_properties.add_ordering([
545            PhysicalSortExpr::new(Arc::new(Column::new("b", 1)), sort_options_not),
546            PhysicalSortExpr::new(Arc::new(Column::new("a", 0)), sort_options),
547        ]);
548        let (result, idxs) = eq_properties.find_longest_permutation(&required_columns)?;
549        assert_eq!(idxs, vec![0, 1]);
550        assert_eq!(
551            result,
552            vec![
553                PhysicalSortExpr::new(col_b, sort_options_not),
554                PhysicalSortExpr::new(col_a, sort_options),
555            ]
556        );
557
558        let schema = Schema::new(vec![
559            Field::new("a", DataType::Int32, true),
560            Field::new("b", DataType::Int32, true),
561            Field::new("c", DataType::Int32, true),
562        ]);
563        let col_a = col("a", &schema)?;
564        let col_b = col("b", &schema)?;
565        let required_columns = [Arc::clone(&col_b), Arc::clone(&col_a)];
566        let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
567        eq_properties.add_orderings([
568            vec![PhysicalSortExpr::new(
569                Arc::new(Column::new("c", 2)),
570                sort_options,
571            )],
572            vec![
573                PhysicalSortExpr::new(Arc::new(Column::new("b", 1)), sort_options_not),
574                PhysicalSortExpr::new(Arc::new(Column::new("a", 0)), sort_options),
575            ],
576        ]);
577        let (result, idxs) = eq_properties.find_longest_permutation(&required_columns)?;
578        assert_eq!(idxs, vec![0, 1]);
579        assert_eq!(
580            result,
581            vec![
582                PhysicalSortExpr::new(col_b, sort_options_not),
583                PhysicalSortExpr::new(col_a, sort_options),
584            ]
585        );
586
587        let required_columns = [
588            Arc::new(Column::new("b", 1)) as _,
589            Arc::new(Column::new("a", 0)) as _,
590        ];
591        let schema = Schema::new(vec![
592            Field::new("a", DataType::Int32, true),
593            Field::new("b", DataType::Int32, true),
594            Field::new("c", DataType::Int32, true),
595        ]);
596        let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
597
598        // not satisfied orders
599        eq_properties.add_ordering([
600            PhysicalSortExpr::new(Arc::new(Column::new("b", 1)), sort_options_not),
601            PhysicalSortExpr::new(Arc::new(Column::new("c", 2)), sort_options),
602            PhysicalSortExpr::new(Arc::new(Column::new("a", 0)), sort_options),
603        ]);
604        let (_, idxs) = eq_properties.find_longest_permutation(&required_columns)?;
605        assert_eq!(idxs, vec![0]);
606
607        Ok(())
608    }
609
610    #[test]
611    fn test_update_properties() -> Result<()> {
612        let schema = Schema::new(vec![
613            Field::new("a", DataType::Int32, true),
614            Field::new("b", DataType::Int32, true),
615            Field::new("c", DataType::Int32, true),
616            Field::new("d", DataType::Int32, true),
617        ]);
618
619        let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
620        let col_a = col("a", &schema)?;
621        let col_b = col("b", &schema)?;
622        let col_c = col("c", &schema)?;
623        let col_d = col("d", &schema)?;
624        let option_asc = SortOptions {
625            descending: false,
626            nulls_first: false,
627        };
628        // b=a (e.g they are aliases)
629        eq_properties.add_equal_conditions(Arc::clone(&col_b), Arc::clone(&col_a))?;
630        // [b ASC], [d ASC]
631        eq_properties.add_orderings([
632            vec![PhysicalSortExpr::new(Arc::clone(&col_b), option_asc)],
633            vec![PhysicalSortExpr::new(Arc::clone(&col_d), option_asc)],
634        ]);
635
636        let test_cases = vec![
637            // d + b
638            (
639                Arc::new(BinaryExpr::new(col_d, Operator::Plus, Arc::clone(&col_b))) as _,
640                SortProperties::Ordered(option_asc),
641            ),
642            // b
643            (col_b, SortProperties::Ordered(option_asc)),
644            // a
645            (Arc::clone(&col_a), SortProperties::Ordered(option_asc)),
646            // a + c
647            (
648                Arc::new(BinaryExpr::new(col_a, Operator::Plus, col_c)),
649                SortProperties::Unordered,
650            ),
651        ];
652        for (expr, expected) in test_cases {
653            let leading_orderings = eq_properties
654                .oeq_class()
655                .iter()
656                .map(|ordering| ordering.first().clone())
657                .collect::<Vec<_>>();
658            let expr_props = eq_properties.get_expr_properties(Arc::clone(&expr));
659            let err_msg = format!(
660                "expr:{:?}, expected: {:?}, actual: {:?}, leading_orderings: {leading_orderings:?}",
661                expr, expected, expr_props.sort_properties
662            );
663            assert_eq!(expr_props.sort_properties, expected, "{err_msg}");
664        }
665
666        Ok(())
667    }
668
669    #[test]
670    fn test_find_longest_permutation() -> Result<()> {
671        // Schema satisfies following orderings:
672        // [a ASC], [d ASC, b ASC], [e DESC, f ASC, g ASC]
673        // and
674        // Column [a=c] (e.g they are aliases).
675        // At below we add [d ASC, h DESC] also, for test purposes
676        let (test_schema, mut eq_properties) = create_test_params()?;
677        let col_a = &col("a", &test_schema)?;
678        let col_b = &col("b", &test_schema)?;
679        let col_c = &col("c", &test_schema)?;
680        let col_d = &col("d", &test_schema)?;
681        let col_e = &col("e", &test_schema)?;
682        let col_f = &col("f", &test_schema)?;
683        let col_h = &col("h", &test_schema)?;
684        // a + d
685        let a_plus_d = Arc::new(BinaryExpr::new(
686            Arc::clone(col_a),
687            Operator::Plus,
688            Arc::clone(col_d),
689        )) as _;
690
691        let option_asc = SortOptions {
692            descending: false,
693            nulls_first: false,
694        };
695        let option_desc = SortOptions {
696            descending: true,
697            nulls_first: true,
698        };
699        // [d ASC, h DESC] also satisfies schema.
700        eq_properties.add_ordering([
701            PhysicalSortExpr::new(Arc::clone(col_d), option_asc),
702            PhysicalSortExpr::new(Arc::clone(col_h), option_desc),
703        ]);
704        let test_cases = vec![
705            // TEST CASE 1
706            (vec![col_a], vec![(col_a, option_asc)]),
707            // TEST CASE 2
708            (vec![col_c], vec![(col_c, option_asc)]),
709            // TEST CASE 3
710            (
711                vec![col_d, col_e, col_b],
712                vec![
713                    (col_d, option_asc),
714                    (col_e, option_desc),
715                    (col_b, option_asc),
716                ],
717            ),
718            // TEST CASE 4
719            (vec![col_b], vec![]),
720            // TEST CASE 5
721            (vec![col_d], vec![(col_d, option_asc)]),
722            // TEST CASE 5
723            (vec![&a_plus_d], vec![(&a_plus_d, option_asc)]),
724            // TEST CASE 6
725            (
726                vec![col_b, col_d],
727                vec![(col_d, option_asc), (col_b, option_asc)],
728            ),
729            // TEST CASE 6
730            (
731                vec![col_c, col_e],
732                vec![(col_c, option_asc), (col_e, option_desc)],
733            ),
734            // TEST CASE 7
735            (
736                vec![col_d, col_h, col_e, col_f, col_b],
737                vec![
738                    (col_d, option_asc),
739                    (col_e, option_desc),
740                    (col_h, option_desc),
741                    (col_f, option_asc),
742                    (col_b, option_asc),
743                ],
744            ),
745            // TEST CASE 8
746            (
747                vec![col_e, col_d, col_h, col_f, col_b],
748                vec![
749                    (col_e, option_desc),
750                    (col_d, option_asc),
751                    (col_h, option_desc),
752                    (col_f, option_asc),
753                    (col_b, option_asc),
754                ],
755            ),
756            // TEST CASE 9
757            (
758                vec![col_e, col_d, col_b, col_h, col_f],
759                vec![
760                    (col_e, option_desc),
761                    (col_d, option_asc),
762                    (col_b, option_asc),
763                    (col_h, option_desc),
764                    (col_f, option_asc),
765                ],
766            ),
767        ];
768        for (exprs, expected) in test_cases {
769            let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
770            let expected = convert_to_sort_exprs(&expected);
771            let (actual, _) = eq_properties.find_longest_permutation(&exprs)?;
772            assert_eq!(actual, expected);
773        }
774
775        Ok(())
776    }
777
778    #[test]
779    fn test_find_longest_permutation2() -> Result<()> {
780        // Schema satisfies following orderings:
781        // [a ASC], [d ASC, b ASC], [e DESC, f ASC, g ASC]
782        // and
783        // Column [a=c] (e.g they are aliases).
784        // At below we add [d ASC, h DESC] also, for test purposes
785        let (test_schema, mut eq_properties) = create_test_params()?;
786        let col_h = &col("h", &test_schema)?;
787
788        // Add column h as constant
789        eq_properties.add_constants(vec![ConstExpr::from(Arc::clone(col_h))])?;
790
791        let test_cases = vec![
792            // TEST CASE 1
793            // ordering of the constants are treated as default ordering.
794            // This is the convention currently used.
795            (vec![col_h], vec![(col_h, SortOptions::default())]),
796        ];
797        for (exprs, expected) in test_cases {
798            let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
799            let expected = convert_to_sort_exprs(&expected);
800            let (actual, _) = eq_properties.find_longest_permutation(&exprs)?;
801            assert_eq!(actual, expected);
802        }
803
804        Ok(())
805    }
806
807    #[test]
808    fn test_normalize_sort_reqs() -> Result<()> {
809        // Schema satisfies following properties
810        // a=c
811        // and following orderings are valid
812        // [a ASC], [d ASC, b ASC], [e DESC, f ASC, g ASC]
813        let (test_schema, eq_properties) = create_test_params()?;
814        let col_a = &col("a", &test_schema)?;
815        let col_b = &col("b", &test_schema)?;
816        let col_c = &col("c", &test_schema)?;
817        let col_d = &col("d", &test_schema)?;
818        let col_e = &col("e", &test_schema)?;
819        let col_f = &col("f", &test_schema)?;
820        let option_asc = SortOptions {
821            descending: false,
822            nulls_first: false,
823        };
824        let option_desc = SortOptions {
825            descending: true,
826            nulls_first: true,
827        };
828        // First element in the tuple stores vector of requirement, second element is the expected return value for ordering_satisfy function
829        let requirements = vec![
830            (
831                vec![(col_a, Some(option_asc))],
832                vec![(col_a, Some(option_asc))],
833            ),
834            (
835                vec![(col_a, Some(option_desc))],
836                vec![(col_a, Some(option_desc))],
837            ),
838            (vec![(col_a, None)], vec![(col_a, None)]),
839            // Test whether equivalence works as expected
840            (
841                vec![(col_c, Some(option_asc))],
842                vec![(col_a, Some(option_asc))],
843            ),
844            (vec![(col_c, None)], vec![(col_a, None)]),
845            // Test whether ordering equivalence works as expected
846            (
847                vec![(col_d, Some(option_asc)), (col_b, Some(option_asc))],
848                vec![(col_d, Some(option_asc)), (col_b, Some(option_asc))],
849            ),
850            (
851                vec![(col_d, None), (col_b, None)],
852                vec![(col_d, None), (col_b, None)],
853            ),
854            (
855                vec![(col_e, Some(option_desc)), (col_f, Some(option_asc))],
856                vec![(col_e, Some(option_desc)), (col_f, Some(option_asc))],
857            ),
858            // We should be able to normalize in compatible requirements also (not exactly equal)
859            (
860                vec![(col_e, Some(option_desc)), (col_f, None)],
861                vec![(col_e, Some(option_desc)), (col_f, None)],
862            ),
863            (
864                vec![(col_e, None), (col_f, None)],
865                vec![(col_e, None), (col_f, None)],
866            ),
867        ];
868
869        for (reqs, expected_normalized) in requirements.into_iter() {
870            let req = convert_to_sort_reqs(&reqs);
871            let expected_normalized = convert_to_sort_reqs(&expected_normalized);
872
873            assert_eq!(
874                eq_properties.normalize_sort_requirements(req).unwrap(),
875                expected_normalized
876            );
877        }
878
879        Ok(())
880    }
881
882    #[test]
883    fn test_schema_normalize_sort_requirement_with_equivalence() -> Result<()> {
884        let option1 = SortOptions {
885            descending: false,
886            nulls_first: false,
887        };
888        // Assume that column a and c are aliases.
889        let (test_schema, eq_properties) = create_test_params()?;
890        let col_a = &col("a", &test_schema)?;
891        let col_c = &col("c", &test_schema)?;
892        let col_d = &col("d", &test_schema)?;
893
894        // Test cases for equivalence normalization
895        // First entry in the tuple is PhysicalSortRequirement, second entry in the tuple is
896        // expected PhysicalSortRequirement after normalization.
897        let test_cases = vec![
898            (vec![(col_a, Some(option1))], vec![(col_a, Some(option1))]),
899            // In the normalized version column c should be replace with column a
900            (vec![(col_c, Some(option1))], vec![(col_a, Some(option1))]),
901            (vec![(col_c, None)], vec![(col_a, None)]),
902            (vec![(col_d, Some(option1))], vec![(col_d, Some(option1))]),
903        ];
904        for (reqs, expected) in test_cases.into_iter() {
905            let reqs = convert_to_sort_reqs(&reqs);
906            let expected = convert_to_sort_reqs(&expected);
907            let normalized = eq_properties
908                .normalize_sort_requirements(reqs.clone())
909                .unwrap();
910            assert!(
911                expected.eq(&normalized),
912                "error in test: reqs: {reqs:?}, expected: {expected:?}, normalized: {normalized:?}"
913            );
914        }
915
916        Ok(())
917    }
918
919    #[test]
920    fn test_eliminate_redundant_monotonic_sorts() -> Result<()> {
921        let schema = Arc::new(Schema::new(vec![
922            Field::new("a", DataType::Date32, true),
923            Field::new("b", DataType::Utf8, true),
924            Field::new("c", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
925        ]));
926        let mut base_properties = EquivalenceProperties::new(Arc::clone(&schema));
927        base_properties.reorder(
928            ["a", "b", "c"]
929                .into_iter()
930                .map(|c| PhysicalSortExpr::new_default(col(c, schema.as_ref()).unwrap())),
931        )?;
932
933        struct TestCase {
934            name: &'static str,
935            constants: Vec<Arc<dyn PhysicalExpr>>,
936            equal_conditions: Vec<[Arc<dyn PhysicalExpr>; 2]>,
937            sort_columns: &'static [&'static str],
938            should_satisfy_ordering: bool,
939        }
940
941        let col_a = col("a", schema.as_ref())?;
942        let col_b = col("b", schema.as_ref())?;
943        let col_c = col("c", schema.as_ref())?;
944        let cast_c = Arc::new(CastExpr::new(col_c, DataType::Date32, None)) as _;
945
946        let cases = vec![
947            TestCase {
948                name: "(a, b, c) -> (c)",
949                // b is constant, so it should be removed from the sort order
950                constants: vec![Arc::clone(&col_b)],
951                equal_conditions: vec![[Arc::clone(&cast_c), Arc::clone(&col_a)]],
952                sort_columns: &["c"],
953                should_satisfy_ordering: true,
954            },
955            // Same test with above test, where equality order is swapped.
956            // Algorithm shouldn't depend on this order.
957            TestCase {
958                name: "(a, b, c) -> (c)",
959                // b is constant, so it should be removed from the sort order
960                constants: vec![col_b],
961                equal_conditions: vec![[Arc::clone(&col_a), Arc::clone(&cast_c)]],
962                sort_columns: &["c"],
963                should_satisfy_ordering: true,
964            },
965            TestCase {
966                name: "not ordered because (b) is not constant",
967                // b is not constant anymore
968                constants: vec![],
969                // a and c are still compatible, but this is irrelevant since the original ordering is (a, b, c)
970                equal_conditions: vec![[Arc::clone(&cast_c), Arc::clone(&col_a)]],
971                sort_columns: &["c"],
972                should_satisfy_ordering: false,
973            },
974        ];
975
976        for case in cases {
977            // Construct the equivalence properties in different orders
978            // to exercise different code paths
979            // (The resulting properties _should_ be the same)
980            for properties in [
981                // Equal conditions before constants
982                {
983                    let mut properties = base_properties.clone();
984                    for [left, right] in case.equal_conditions.clone() {
985                        properties.add_equal_conditions(left, right)?
986                    }
987                    properties.add_constants(
988                        case.constants.iter().cloned().map(ConstExpr::from),
989                    )?;
990                    properties
991                },
992                // Constants before equal conditions
993                {
994                    let mut properties = base_properties.clone();
995                    properties.add_constants(
996                        case.constants.iter().cloned().map(ConstExpr::from),
997                    )?;
998                    for [left, right] in case.equal_conditions {
999                        properties.add_equal_conditions(left, right)?
1000                    }
1001                    properties
1002                },
1003            ] {
1004                let sort = case
1005                    .sort_columns
1006                    .iter()
1007                    .map(|&name| col(name, &schema).map(PhysicalSortExpr::new_default))
1008                    .collect::<Result<Vec<_>>>()?;
1009
1010                assert_eq!(
1011                    properties.ordering_satisfy(sort)?,
1012                    case.should_satisfy_ordering,
1013                    "failed test '{}'",
1014                    case.name
1015                );
1016            }
1017        }
1018
1019        Ok(())
1020    }
1021
1022    #[test]
1023    fn test_ordering_equivalence_with_lex_monotonic_concat() -> Result<()> {
1024        let schema = Arc::new(Schema::new(vec![
1025            Field::new("a", DataType::Utf8, false),
1026            Field::new("b", DataType::Utf8, false),
1027            Field::new("c", DataType::Utf8, false),
1028        ]));
1029
1030        let col_a = col("a", &schema)?;
1031        let col_b = col("b", &schema)?;
1032        let col_c = col("c", &schema)?;
1033
1034        let a_concat_b: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
1035            "concat",
1036            concat(),
1037            vec![Arc::clone(&col_a), Arc::clone(&col_b)],
1038            Field::new("f", DataType::Utf8, true).into(),
1039            Arc::new(ConfigOptions::default()),
1040        ));
1041
1042        // Assume existing ordering is [c ASC, a ASC, b ASC]
1043        let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1044
1045        eq_properties.add_ordering([
1046            PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc(),
1047            PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
1048            PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
1049        ]);
1050
1051        // Add equality condition c = concat(a, b)
1052        eq_properties.add_equal_conditions(Arc::clone(&col_c), a_concat_b)?;
1053
1054        let orderings = eq_properties.oeq_class();
1055
1056        let expected_ordering1 = [PhysicalSortExpr::new_default(col_c).asc()].into();
1057        let expected_ordering2 = [
1058            PhysicalSortExpr::new_default(col_a).asc(),
1059            PhysicalSortExpr::new_default(col_b).asc(),
1060        ]
1061        .into();
1062
1063        // The ordering should be [c ASC] and [a ASC, b ASC]
1064        assert_eq!(orderings.len(), 2);
1065        assert!(orderings.contains(&expected_ordering1));
1066        assert!(orderings.contains(&expected_ordering2));
1067
1068        Ok(())
1069    }
1070
1071    #[test]
1072    fn test_ordering_equivalence_with_non_lex_monotonic_multiply() -> Result<()> {
1073        let schema = Arc::new(Schema::new(vec![
1074            Field::new("a", DataType::Int32, false),
1075            Field::new("b", DataType::Int32, false),
1076            Field::new("c", DataType::Int32, false),
1077        ]));
1078
1079        let col_a = col("a", &schema)?;
1080        let col_b = col("b", &schema)?;
1081        let col_c = col("c", &schema)?;
1082
1083        let a_times_b = Arc::new(BinaryExpr::new(
1084            Arc::clone(&col_a),
1085            Operator::Multiply,
1086            Arc::clone(&col_b),
1087        )) as _;
1088
1089        // Assume existing ordering is [c ASC, a ASC, b ASC]
1090        let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1091
1092        let initial_ordering: LexOrdering = [
1093            PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc(),
1094            PhysicalSortExpr::new_default(col_a).asc(),
1095            PhysicalSortExpr::new_default(col_b).asc(),
1096        ]
1097        .into();
1098
1099        eq_properties.add_ordering(initial_ordering.clone());
1100
1101        // Add equality condition c = a * b
1102        eq_properties.add_equal_conditions(col_c, a_times_b)?;
1103
1104        let orderings = eq_properties.oeq_class();
1105
1106        // The ordering should remain unchanged since multiplication is not lex-monotonic
1107        assert_eq!(orderings.len(), 1);
1108        assert!(orderings.contains(&initial_ordering));
1109
1110        Ok(())
1111    }
1112
1113    #[test]
1114    fn test_ordering_equivalence_with_concat_equality() -> Result<()> {
1115        let schema = Arc::new(Schema::new(vec![
1116            Field::new("a", DataType::Utf8, false),
1117            Field::new("b", DataType::Utf8, false),
1118            Field::new("c", DataType::Utf8, false),
1119        ]));
1120
1121        let col_a = col("a", &schema)?;
1122        let col_b = col("b", &schema)?;
1123        let col_c = col("c", &schema)?;
1124
1125        let a_concat_b = Arc::new(ScalarFunctionExpr::new(
1126            "concat",
1127            concat(),
1128            vec![Arc::clone(&col_a), Arc::clone(&col_b)],
1129            Field::new("f", DataType::Utf8, true).into(),
1130            Arc::new(ConfigOptions::default()),
1131        )) as _;
1132
1133        // Assume existing ordering is [concat(a, b) ASC, a ASC, b ASC]
1134        let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1135
1136        eq_properties.add_ordering([
1137            PhysicalSortExpr::new_default(Arc::clone(&a_concat_b)).asc(),
1138            PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
1139            PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
1140        ]);
1141
1142        // Add equality condition c = concat(a, b)
1143        eq_properties.add_equal_conditions(col_c, Arc::clone(&a_concat_b))?;
1144
1145        let orderings = eq_properties.oeq_class();
1146
1147        let expected_ordering1 = [PhysicalSortExpr::new_default(a_concat_b).asc()].into();
1148        let expected_ordering2 = [
1149            PhysicalSortExpr::new_default(col_a).asc(),
1150            PhysicalSortExpr::new_default(col_b).asc(),
1151        ]
1152        .into();
1153
1154        // The ordering should be [c ASC] and [a ASC, b ASC]
1155        assert_eq!(orderings.len(), 2);
1156        assert!(orderings.contains(&expected_ordering1));
1157        assert!(orderings.contains(&expected_ordering2));
1158
1159        Ok(())
1160    }
1161
1162    #[test]
1163    fn test_requirements_compatible() -> Result<()> {
1164        let schema = Arc::new(Schema::new(vec![
1165            Field::new("a", DataType::Int32, true),
1166            Field::new("b", DataType::Int32, true),
1167            Field::new("c", DataType::Int32, true),
1168        ]));
1169        let col_a = col("a", &schema)?;
1170        let col_b = col("b", &schema)?;
1171        let col_c = col("c", &schema)?;
1172
1173        let eq_properties = EquivalenceProperties::new(schema);
1174        let lex_a: LexRequirement =
1175            [PhysicalSortRequirement::new(Arc::clone(&col_a), None)].into();
1176        let lex_a_b: LexRequirement = [
1177            PhysicalSortRequirement::new(col_a, None),
1178            PhysicalSortRequirement::new(col_b, None),
1179        ]
1180        .into();
1181        let lex_c = [PhysicalSortRequirement::new(col_c, None)].into();
1182
1183        assert!(eq_properties.requirements_compatible(lex_a.clone(), lex_a.clone()));
1184        assert!(!eq_properties.requirements_compatible(lex_a.clone(), lex_a_b.clone()));
1185        assert!(eq_properties.requirements_compatible(lex_a_b, lex_a.clone()));
1186        assert!(!eq_properties.requirements_compatible(lex_c, lex_a));
1187
1188        Ok(())
1189    }
1190
1191    #[test]
1192    fn test_with_reorder_constant_filtering() -> Result<()> {
1193        let schema = create_test_schema()?;
1194        let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1195
1196        // Setup constant columns
1197        let col_a = col("a", &schema)?;
1198        let col_b = col("b", &schema)?;
1199        eq_properties.add_constants([ConstExpr::from(Arc::clone(&col_a))])?;
1200
1201        let sort_exprs = vec![
1202            PhysicalSortExpr::new_default(Arc::clone(&col_a)),
1203            PhysicalSortExpr::new_default(Arc::clone(&col_b)),
1204        ];
1205
1206        let change = eq_properties.reorder(sort_exprs)?;
1207        assert!(change);
1208
1209        assert_eq!(eq_properties.oeq_class().len(), 1);
1210        let ordering = eq_properties.oeq_class().iter().next().unwrap();
1211        assert_eq!(ordering.len(), 2);
1212        assert!(ordering[0].expr.eq(&col_a));
1213        assert!(ordering[1].expr.eq(&col_b));
1214
1215        Ok(())
1216    }
1217
1218    #[test]
1219    fn test_with_reorder_preserve_suffix() -> Result<()> {
1220        let schema = create_test_schema()?;
1221        let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1222
1223        let col_a = col("a", &schema)?;
1224        let col_b = col("b", &schema)?;
1225        let col_c = col("c", &schema)?;
1226
1227        let asc = SortOptions::default();
1228        let desc = SortOptions {
1229            descending: true,
1230            nulls_first: true,
1231        };
1232
1233        // Initial ordering: [a ASC, b DESC, c ASC]
1234        eq_properties.add_ordering([
1235            PhysicalSortExpr::new(Arc::clone(&col_a), asc),
1236            PhysicalSortExpr::new(Arc::clone(&col_b), desc),
1237            PhysicalSortExpr::new(Arc::clone(&col_c), asc),
1238        ]);
1239
1240        // New ordering: [a ASC]
1241        let new_order = vec![PhysicalSortExpr::new(Arc::clone(&col_a), asc)];
1242
1243        let change = eq_properties.reorder(new_order)?;
1244        assert!(!change);
1245
1246        // Should only contain [a ASC, b DESC, c ASC]
1247        assert_eq!(eq_properties.oeq_class().len(), 1);
1248        let ordering = eq_properties.oeq_class().iter().next().unwrap();
1249        assert_eq!(ordering.len(), 3);
1250        assert!(ordering[0].expr.eq(&col_a));
1251        assert!(ordering[0].options.eq(&asc));
1252        assert!(ordering[1].expr.eq(&col_b));
1253        assert!(ordering[1].options.eq(&desc));
1254        assert!(ordering[2].expr.eq(&col_c));
1255        assert!(ordering[2].options.eq(&asc));
1256
1257        Ok(())
1258    }
1259
1260    #[test]
1261    fn test_with_reorder_equivalent_expressions() -> Result<()> {
1262        let schema = create_test_schema()?;
1263        let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1264
1265        let col_a = col("a", &schema)?;
1266        let col_b = col("b", &schema)?;
1267        let col_c = col("c", &schema)?;
1268
1269        // Make a and b equivalent
1270        eq_properties.add_equal_conditions(Arc::clone(&col_a), Arc::clone(&col_b))?;
1271
1272        // Initial ordering: [a ASC, c ASC]
1273        eq_properties.add_ordering([
1274            PhysicalSortExpr::new_default(Arc::clone(&col_a)),
1275            PhysicalSortExpr::new_default(Arc::clone(&col_c)),
1276        ]);
1277
1278        // New ordering: [b ASC]
1279        let new_order = vec![PhysicalSortExpr::new_default(Arc::clone(&col_b))];
1280
1281        let change = eq_properties.reorder(new_order)?;
1282
1283        assert!(!change);
1284        // Should only contain [a/b ASC, c ASC]
1285        assert_eq!(eq_properties.oeq_class().len(), 1);
1286
1287        // Verify orderings
1288        let asc = SortOptions::default();
1289        let ordering = eq_properties.oeq_class().iter().next().unwrap();
1290        assert_eq!(ordering.len(), 2);
1291        assert!(ordering[0].expr.eq(&col_a) || ordering[0].expr.eq(&col_b));
1292        assert!(ordering[0].options.eq(&asc));
1293        assert!(ordering[1].expr.eq(&col_c));
1294        assert!(ordering[1].options.eq(&asc));
1295
1296        Ok(())
1297    }
1298
1299    #[test]
1300    fn test_with_reorder_incompatible_prefix() -> Result<()> {
1301        let schema = create_test_schema()?;
1302        let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1303
1304        let col_a = col("a", &schema)?;
1305        let col_b = col("b", &schema)?;
1306
1307        let asc = SortOptions::default();
1308        let desc = SortOptions {
1309            descending: true,
1310            nulls_first: true,
1311        };
1312
1313        // Initial ordering: [a ASC, b DESC]
1314        eq_properties.add_ordering([
1315            PhysicalSortExpr::new(Arc::clone(&col_a), asc),
1316            PhysicalSortExpr::new(Arc::clone(&col_b), desc),
1317        ]);
1318
1319        // New ordering: [a DESC]
1320        let new_order = vec![PhysicalSortExpr::new(Arc::clone(&col_a), desc)];
1321
1322        let change = eq_properties.reorder(new_order.clone())?;
1323
1324        assert!(change);
1325        // Should only contain the new ordering since options don't match
1326        assert_eq!(eq_properties.oeq_class().len(), 1);
1327        let ordering = eq_properties.oeq_class().iter().next().unwrap();
1328        assert_eq!(ordering.to_vec(), new_order);
1329
1330        Ok(())
1331    }
1332
1333    #[test]
1334    fn test_with_reorder_comprehensive() -> Result<()> {
1335        let schema = create_test_schema()?;
1336        let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1337
1338        let col_a = col("a", &schema)?;
1339        let col_b = col("b", &schema)?;
1340        let col_c = col("c", &schema)?;
1341        let col_d = col("d", &schema)?;
1342        let col_e = col("e", &schema)?;
1343
1344        // Constants: c is constant
1345        eq_properties.add_constants([ConstExpr::from(Arc::clone(&col_c))])?;
1346
1347        // Equality: b = d
1348        eq_properties.add_equal_conditions(Arc::clone(&col_b), Arc::clone(&col_d))?;
1349
1350        // Orderings: [d ASC, a ASC], [e ASC]
1351        eq_properties.add_orderings([
1352            vec![
1353                PhysicalSortExpr::new_default(Arc::clone(&col_d)),
1354                PhysicalSortExpr::new_default(Arc::clone(&col_a)),
1355            ],
1356            vec![PhysicalSortExpr::new_default(Arc::clone(&col_e))],
1357        ]);
1358
1359        // New ordering: [b ASC, c ASC]
1360        let new_order = vec![
1361            PhysicalSortExpr::new_default(Arc::clone(&col_b)),
1362            PhysicalSortExpr::new_default(Arc::clone(&col_c)),
1363        ];
1364
1365        let old_orderings = eq_properties.oeq_class().clone();
1366        let change = eq_properties.reorder(new_order)?;
1367        // Original orderings should be preserved:
1368        assert!(!change);
1369        assert_eq!(eq_properties.oeq_class, old_orderings);
1370
1371        Ok(())
1372    }
1373
1374    #[test]
1375    fn test_ordering_satisfaction_with_key_constraints() -> Result<()> {
1376        let pk_schema = Arc::new(Schema::new(vec![
1377            Field::new("a", DataType::Int32, true),
1378            Field::new("b", DataType::Int32, true),
1379            Field::new("c", DataType::Int32, true),
1380            Field::new("d", DataType::Int32, true),
1381        ]));
1382
1383        let unique_schema = Arc::new(Schema::new(vec![
1384            Field::new("a", DataType::Int32, false),
1385            Field::new("b", DataType::Int32, false),
1386            Field::new("c", DataType::Int32, true),
1387            Field::new("d", DataType::Int32, true),
1388        ]));
1389
1390        // Test cases to run
1391        let test_cases = vec![
1392            // (name, schema, constraint, base_ordering, satisfied_orderings, unsatisfied_orderings)
1393            (
1394                "single column primary key",
1395                &pk_schema,
1396                vec![Constraint::PrimaryKey(vec![0])],
1397                vec!["a"], // base ordering
1398                vec![vec!["a", "b"], vec!["a", "c", "d"]],
1399                vec![vec!["b", "a"], vec!["c", "a"]],
1400            ),
1401            (
1402                "single column unique",
1403                &unique_schema,
1404                vec![Constraint::Unique(vec![0])],
1405                vec!["a"], // base ordering
1406                vec![vec!["a", "b"], vec!["a", "c", "d"]],
1407                vec![vec!["b", "a"], vec!["c", "a"]],
1408            ),
1409            (
1410                "multi-column primary key",
1411                &pk_schema,
1412                vec![Constraint::PrimaryKey(vec![0, 1])],
1413                vec!["a", "b"], // base ordering
1414                vec![vec!["a", "b", "c"], vec!["a", "b", "d"]],
1415                vec![vec!["b", "a"], vec!["a", "c", "b"]],
1416            ),
1417            (
1418                "multi-column unique",
1419                &unique_schema,
1420                vec![Constraint::Unique(vec![0, 1])],
1421                vec!["a", "b"], // base ordering
1422                vec![vec!["a", "b", "c"], vec!["a", "b", "d"]],
1423                vec![vec!["b", "a"], vec!["c", "a", "b"]],
1424            ),
1425            (
1426                "nullable unique",
1427                &unique_schema,
1428                vec![Constraint::Unique(vec![2, 3])],
1429                vec!["c", "d"], // base ordering
1430                vec![],
1431                vec![vec!["c", "d", "a"]],
1432            ),
1433            (
1434                "ordering with arbitrary column unique",
1435                &unique_schema,
1436                vec![Constraint::Unique(vec![0, 1])],
1437                vec!["a", "c", "b"], // base ordering
1438                vec![vec!["a", "c", "b", "d"]],
1439                vec![vec!["a", "b", "d"]],
1440            ),
1441            (
1442                "ordering with arbitrary column pk",
1443                &pk_schema,
1444                vec![Constraint::PrimaryKey(vec![0, 1])],
1445                vec!["a", "c", "b"], // base ordering
1446                vec![vec!["a", "c", "b", "d"]],
1447                vec![vec!["a", "b", "d"]],
1448            ),
1449            (
1450                "ordering with arbitrary column pk complex",
1451                &pk_schema,
1452                vec![Constraint::PrimaryKey(vec![3, 1])],
1453                vec!["b", "a", "d"], // base ordering
1454                vec![vec!["b", "a", "d", "c"]],
1455                vec![vec!["b", "c", "d", "a"], vec!["b", "a", "c", "d"]],
1456            ),
1457        ];
1458
1459        for (
1460            name,
1461            schema,
1462            constraints,
1463            base_order,
1464            satisfied_orders,
1465            unsatisfied_orders,
1466        ) in test_cases
1467        {
1468            let mut eq_properties = EquivalenceProperties::new(Arc::clone(schema));
1469
1470            // Convert string column names to orderings
1471            let satisfied_orderings: Vec<_> = satisfied_orders
1472                .iter()
1473                .map(|cols| {
1474                    cols.iter()
1475                        .map(|col_name| {
1476                            PhysicalSortExpr::new_default(col(col_name, schema).unwrap())
1477                        })
1478                        .collect::<Vec<_>>()
1479                })
1480                .collect();
1481
1482            let unsatisfied_orderings: Vec<_> = unsatisfied_orders
1483                .iter()
1484                .map(|cols| {
1485                    cols.iter()
1486                        .map(|col_name| {
1487                            PhysicalSortExpr::new_default(col(col_name, schema).unwrap())
1488                        })
1489                        .collect::<Vec<_>>()
1490                })
1491                .collect();
1492
1493            // Test that orderings are not satisfied before adding constraints
1494            for ordering in satisfied_orderings.clone() {
1495                let err_msg = format!(
1496                    "{name}: ordering {ordering:?} should not be satisfied before adding constraints",
1497                );
1498                assert!(!eq_properties.ordering_satisfy(ordering)?, "{err_msg}");
1499            }
1500
1501            // Add base ordering
1502            let base_ordering = base_order.iter().map(|col_name| PhysicalSortExpr {
1503                expr: col(col_name, schema).unwrap(),
1504                options: SortOptions::default(),
1505            });
1506            eq_properties.add_ordering(base_ordering);
1507
1508            // Add constraints
1509            eq_properties =
1510                eq_properties.with_constraints(Constraints::new_unverified(constraints));
1511
1512            // Test that expected orderings are now satisfied
1513            for ordering in satisfied_orderings {
1514                let err_msg = format!(
1515                    "{name}: ordering {ordering:?} should be satisfied after adding constraints",
1516                );
1517                assert!(eq_properties.ordering_satisfy(ordering)?, "{err_msg}");
1518            }
1519
1520            // Test that unsatisfied orderings remain unsatisfied
1521            for ordering in unsatisfied_orderings {
1522                let err_msg = format!(
1523                    "{name}: ordering {ordering:?} should not be satisfied after adding constraints",
1524                );
1525                assert!(!eq_properties.ordering_satisfy(ordering)?, "{err_msg}");
1526            }
1527        }
1528
1529        Ok(())
1530    }
1531}