1use std::fmt::{self, Display};
19use std::ops::{Deref, DerefMut};
20use std::sync::Arc;
21
22use super::expr_refers;
23use crate::{LexOrdering, PhysicalSortExpr};
24
25use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
26
27use indexmap::{IndexMap, IndexSet};
28use itertools::Itertools;
29
30#[derive(Debug, Default, Clone, PartialEq, Eq)]
33pub struct Dependencies {
34 sort_exprs: IndexSet<PhysicalSortExpr>,
35}
36
37impl Display for Dependencies {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 write!(f, "[")?;
40 let mut iter = self.sort_exprs.iter();
41 if let Some(dep) = iter.next() {
42 write!(f, "{dep}")?;
43 }
44 for dep in iter {
45 write!(f, ", {dep}")?;
46 }
47 write!(f, "]")
48 }
49}
50
51impl Dependencies {
52 pub fn new(sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>) -> Self {
54 Self {
55 sort_exprs: sort_exprs.into_iter().collect(),
56 }
57 }
58}
59
60impl Deref for Dependencies {
61 type Target = IndexSet<PhysicalSortExpr>;
62
63 fn deref(&self) -> &Self::Target {
64 &self.sort_exprs
65 }
66}
67
68impl DerefMut for Dependencies {
69 fn deref_mut(&mut self) -> &mut Self::Target {
70 &mut self.sort_exprs
71 }
72}
73
74impl IntoIterator for Dependencies {
75 type Item = PhysicalSortExpr;
76 type IntoIter = <IndexSet<PhysicalSortExpr> as IntoIterator>::IntoIter;
77
78 fn into_iter(self) -> Self::IntoIter {
79 self.sort_exprs.into_iter()
80 }
81}
82
83pub struct DependencyEnumerator<'a> {
85 seen: IndexMap<&'a PhysicalSortExpr, IndexSet<&'a PhysicalSortExpr>>,
87}
88
89impl<'a> DependencyEnumerator<'a> {
90 pub fn new() -> Self {
91 Self {
92 seen: IndexMap::new(),
93 }
94 }
95
96 fn insert(
101 &mut self,
102 target: &'a PhysicalSortExpr,
103 dep: &'a PhysicalSortExpr,
104 ) -> bool {
105 self.seen.entry(target).or_default().insert(dep)
106 }
107
108 pub fn construct_orderings(
125 &mut self,
126 referred_sort_expr: &'a PhysicalSortExpr,
127 dependency_map: &'a DependencyMap,
128 ) -> Vec<LexOrdering> {
129 let node = dependency_map
130 .get(referred_sort_expr)
131 .expect("`referred_sort_expr` should be inside `dependency_map`");
132 let target = node.target.as_ref().unwrap();
134 if node.dependencies.is_empty() {
137 return vec![[target.clone()].into()];
138 };
139
140 node.dependencies
141 .iter()
142 .flat_map(|dep| {
143 let mut orderings = if self.insert(target, dep) {
144 self.construct_orderings(dep, dependency_map)
145 } else {
146 vec![]
147 };
148
149 for ordering in orderings.iter_mut() {
150 ordering.push(target.clone());
151 }
152 orderings
153 })
154 .collect()
155 }
156}
157
158#[derive(Debug, Default)]
191pub struct DependencyMap {
192 map: IndexMap<PhysicalSortExpr, DependencyNode>,
193}
194
195impl DependencyMap {
196 pub fn insert(
199 &mut self,
200 sort_expr: PhysicalSortExpr,
201 target_sort_expr: Option<PhysicalSortExpr>,
202 dependency: Option<PhysicalSortExpr>,
203 ) {
204 let entry = self.map.entry(sort_expr);
205 let node = entry.or_insert_with(|| DependencyNode {
206 target: target_sort_expr,
207 dependencies: Dependencies::default(),
208 });
209 node.dependencies.extend(dependency);
210 }
211}
212
213impl Deref for DependencyMap {
214 type Target = IndexMap<PhysicalSortExpr, DependencyNode>;
215
216 fn deref(&self) -> &Self::Target {
217 &self.map
218 }
219}
220
221impl Display for DependencyMap {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 writeln!(f, "DependencyMap: {{")?;
224 for (sort_expr, node) in self.map.iter() {
225 writeln!(f, " {sort_expr} --> {node}")?;
226 }
227 writeln!(f, "}}")
228 }
229}
230
231#[derive(Debug, Clone, PartialEq, Eq)]
245pub struct DependencyNode {
246 pub(crate) target: Option<PhysicalSortExpr>,
247 pub(crate) dependencies: Dependencies,
248}
249
250impl Display for DependencyNode {
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252 if let Some(target) = &self.target {
253 write!(f, "(target: {target}, ")?;
254 } else {
255 write!(f, "(")?;
256 }
257 write!(f, "dependencies: [{}])", self.dependencies)
258 }
259}
260
261pub fn referred_dependencies(
277 dependency_map: &DependencyMap,
278 source: &Arc<dyn PhysicalExpr>,
279) -> Vec<Dependencies> {
280 let mut expr_to_sort_exprs = IndexMap::<_, Dependencies>::new();
282 for sort_expr in dependency_map
283 .keys()
284 .filter(|sort_expr| expr_refers(source, &sort_expr.expr))
285 {
286 let key = Arc::clone(&sort_expr.expr);
287 expr_to_sort_exprs
288 .entry(key)
289 .or_default()
290 .insert(sort_expr.clone());
291 }
292
293 expr_to_sort_exprs
297 .into_values()
298 .multi_cartesian_product()
299 .map(Dependencies::new)
300 .collect()
301}
302
303pub fn construct_prefix_orderings(
318 relevant_sort_expr: &PhysicalSortExpr,
319 dependency_map: &DependencyMap,
320) -> Vec<LexOrdering> {
321 let mut dep_enumerator = DependencyEnumerator::new();
322 dependency_map
323 .get(relevant_sort_expr)
324 .expect("no relevant sort expr found")
325 .dependencies
326 .iter()
327 .flat_map(|dep| dep_enumerator.construct_orderings(dep, dependency_map))
328 .collect()
329}
330
331pub fn generate_dependency_orderings(
354 dependencies: &Dependencies,
355 dependency_map: &DependencyMap,
356) -> Vec<LexOrdering> {
357 dependencies
361 .iter()
362 .filter_map(|dep| {
363 let prefixes = construct_prefix_orderings(dep, dependency_map);
364 (!prefixes.is_empty()).then_some(prefixes)
365 })
366 .multi_cartesian_product()
368 .flat_map(|prefix_orderings| {
369 let length = prefix_orderings.len();
370 prefix_orderings
371 .into_iter()
372 .permutations(length)
373 .filter_map(|prefixes| {
374 prefixes.into_iter().reduce(|mut acc, ordering| {
375 acc.extend(ordering);
376 acc
377 })
378 })
379 })
380 .collect()
381}
382
383#[cfg(test)]
384mod tests {
385 use std::ops::Not;
386 use std::sync::Arc;
387
388 use super::*;
389 use crate::equivalence::tests::{
390 convert_to_sort_reqs, create_test_params, create_test_schema, parse_sort_expr,
391 };
392 use crate::equivalence::{convert_to_sort_exprs, ProjectionMapping};
393 use crate::expressions::{col, BinaryExpr, CastExpr, Column};
394 use crate::projection::tests::output_schema;
395 use crate::{ConstExpr, EquivalenceProperties, ScalarFunctionExpr};
396
397 use arrow::compute::SortOptions;
398 use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
399 use datafusion_common::config::ConfigOptions;
400 use datafusion_common::{Constraint, Constraints, Result};
401 use datafusion_expr::sort_properties::SortProperties;
402 use datafusion_expr::Operator;
403 use datafusion_functions::string::concat;
404 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
405 use datafusion_physical_expr_common::sort_expr::{
406 LexRequirement, PhysicalSortRequirement,
407 };
408
409 #[test]
410 fn project_equivalence_properties_test() -> Result<()> {
411 let input_schema = Arc::new(Schema::new(vec![
412 Field::new("a", DataType::Int64, true),
413 Field::new("b", DataType::Int64, true),
414 Field::new("c", DataType::Int64, true),
415 ]));
416
417 let input_properties = EquivalenceProperties::new(Arc::clone(&input_schema));
418 let col_a = col("a", &input_schema)?;
419
420 let proj_exprs = vec![
422 (Arc::clone(&col_a), "a1".to_string()),
423 (Arc::clone(&col_a), "a2".to_string()),
424 (Arc::clone(&col_a), "a3".to_string()),
425 (Arc::clone(&col_a), "a4".to_string()),
426 ];
427 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &input_schema)?;
428
429 let out_schema = output_schema(&projection_mapping, &input_schema)?;
430 let proj_exprs = vec![
432 (Arc::clone(&col_a), "a1".to_string()),
433 (Arc::clone(&col_a), "a2".to_string()),
434 (Arc::clone(&col_a), "a3".to_string()),
435 (Arc::clone(&col_a), "a4".to_string()),
436 ];
437 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &input_schema)?;
438
439 let col_a1 = &col("a1", &out_schema)?;
441 let col_a2 = &col("a2", &out_schema)?;
442 let col_a3 = &col("a3", &out_schema)?;
443 let col_a4 = &col("a4", &out_schema)?;
444 let out_properties = input_properties.project(&projection_mapping, out_schema);
445
446 assert_eq!(out_properties.eq_group().len(), 1);
448 let eq_class = out_properties.eq_group().iter().next().unwrap();
449 assert_eq!(eq_class.len(), 4);
450 assert!(eq_class.contains(col_a1));
451 assert!(eq_class.contains(col_a2));
452 assert!(eq_class.contains(col_a3));
453 assert!(eq_class.contains(col_a4));
454
455 Ok(())
456 }
457
458 #[test]
459 fn project_equivalence_properties_test_multi() -> Result<()> {
460 let input_schema = Arc::new(Schema::new(vec![
462 Field::new("a", DataType::Int64, true),
463 Field::new("b", DataType::Int64, true),
464 Field::new("c", DataType::Int64, true),
465 Field::new("d", DataType::Int64, true),
466 ]));
467
468 let mut input_properties = EquivalenceProperties::new(Arc::clone(&input_schema));
469 input_properties.add_ordering([
471 parse_sort_expr("a", &input_schema),
472 parse_sort_expr("b", &input_schema),
473 parse_sort_expr("c", &input_schema),
474 parse_sort_expr("d", &input_schema),
475 ]);
476
477 input_properties.add_ordering([
479 parse_sort_expr("a", &input_schema),
480 parse_sort_expr("c", &input_schema),
481 parse_sort_expr("b", &input_schema), parse_sort_expr("d", &input_schema),
483 ]);
484
485 let proj_exprs = vec![
487 (col("a", &input_schema)?, "a".to_string()),
488 (col("b", &input_schema)?, "b".to_string()),
489 (col("c", &input_schema)?, "c".to_string()),
490 (col("d", &input_schema)?, "d".to_string()),
491 ];
492 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &input_schema)?;
493 let out_properties = input_properties.project(&projection_mapping, input_schema);
494
495 assert_eq!(
496 out_properties.to_string(),
497 "order: [[a@0 ASC, c@2 ASC, b@1 ASC, d@3 ASC], [a@0 ASC, b@1 ASC, c@2 ASC, d@3 ASC]]"
498 );
499
500 Ok(())
501 }
502
503 #[test]
504 fn test_normalize_ordering_equivalence_classes() -> Result<()> {
505 let schema = Schema::new(vec![
506 Field::new("a", DataType::Int32, true),
507 Field::new("b", DataType::Int32, true),
508 Field::new("c", DataType::Int32, true),
509 ]);
510 let col_a_expr = col("a", &schema)?;
511 let col_b_expr = col("b", &schema)?;
512 let col_c_expr = col("c", &schema)?;
513 let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
514
515 eq_properties.add_equal_conditions(col_a_expr, Arc::clone(&col_c_expr))?;
516 eq_properties.add_orderings([
517 vec![PhysicalSortExpr::new_default(Arc::clone(&col_b_expr))],
518 vec![PhysicalSortExpr::new_default(Arc::clone(&col_c_expr))],
519 ]);
520
521 let mut expected_eqs = EquivalenceProperties::new(Arc::new(schema));
522 expected_eqs.add_orderings([
523 vec![PhysicalSortExpr::new_default(col_b_expr)],
524 vec![PhysicalSortExpr::new_default(col_c_expr)],
525 ]);
526
527 assert!(eq_properties.oeq_class().eq(expected_eqs.oeq_class()));
528 Ok(())
529 }
530
531 #[test]
532 fn test_get_indices_of_matching_sort_exprs_with_order_eq() -> Result<()> {
533 let sort_options = SortOptions::default();
534 let sort_options_not = SortOptions::default().not();
535
536 let schema = Schema::new(vec![
537 Field::new("a", DataType::Int32, true),
538 Field::new("b", DataType::Int32, true),
539 ]);
540 let col_a = col("a", &schema)?;
541 let col_b = col("b", &schema)?;
542 let required_columns = [Arc::clone(&col_b), Arc::clone(&col_a)];
543 let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
544 eq_properties.add_ordering([
545 PhysicalSortExpr::new(Arc::new(Column::new("b", 1)), sort_options_not),
546 PhysicalSortExpr::new(Arc::new(Column::new("a", 0)), sort_options),
547 ]);
548 let (result, idxs) = eq_properties.find_longest_permutation(&required_columns)?;
549 assert_eq!(idxs, vec![0, 1]);
550 assert_eq!(
551 result,
552 vec![
553 PhysicalSortExpr::new(col_b, sort_options_not),
554 PhysicalSortExpr::new(col_a, sort_options),
555 ]
556 );
557
558 let schema = Schema::new(vec![
559 Field::new("a", DataType::Int32, true),
560 Field::new("b", DataType::Int32, true),
561 Field::new("c", DataType::Int32, true),
562 ]);
563 let col_a = col("a", &schema)?;
564 let col_b = col("b", &schema)?;
565 let required_columns = [Arc::clone(&col_b), Arc::clone(&col_a)];
566 let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
567 eq_properties.add_orderings([
568 vec![PhysicalSortExpr::new(
569 Arc::new(Column::new("c", 2)),
570 sort_options,
571 )],
572 vec![
573 PhysicalSortExpr::new(Arc::new(Column::new("b", 1)), sort_options_not),
574 PhysicalSortExpr::new(Arc::new(Column::new("a", 0)), sort_options),
575 ],
576 ]);
577 let (result, idxs) = eq_properties.find_longest_permutation(&required_columns)?;
578 assert_eq!(idxs, vec![0, 1]);
579 assert_eq!(
580 result,
581 vec![
582 PhysicalSortExpr::new(col_b, sort_options_not),
583 PhysicalSortExpr::new(col_a, sort_options),
584 ]
585 );
586
587 let required_columns = [
588 Arc::new(Column::new("b", 1)) as _,
589 Arc::new(Column::new("a", 0)) as _,
590 ];
591 let schema = Schema::new(vec![
592 Field::new("a", DataType::Int32, true),
593 Field::new("b", DataType::Int32, true),
594 Field::new("c", DataType::Int32, true),
595 ]);
596 let mut eq_properties = EquivalenceProperties::new(Arc::new(schema));
597
598 eq_properties.add_ordering([
600 PhysicalSortExpr::new(Arc::new(Column::new("b", 1)), sort_options_not),
601 PhysicalSortExpr::new(Arc::new(Column::new("c", 2)), sort_options),
602 PhysicalSortExpr::new(Arc::new(Column::new("a", 0)), sort_options),
603 ]);
604 let (_, idxs) = eq_properties.find_longest_permutation(&required_columns)?;
605 assert_eq!(idxs, vec![0]);
606
607 Ok(())
608 }
609
610 #[test]
611 fn test_update_properties() -> Result<()> {
612 let schema = Schema::new(vec![
613 Field::new("a", DataType::Int32, true),
614 Field::new("b", DataType::Int32, true),
615 Field::new("c", DataType::Int32, true),
616 Field::new("d", DataType::Int32, true),
617 ]);
618
619 let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone()));
620 let col_a = col("a", &schema)?;
621 let col_b = col("b", &schema)?;
622 let col_c = col("c", &schema)?;
623 let col_d = col("d", &schema)?;
624 let option_asc = SortOptions {
625 descending: false,
626 nulls_first: false,
627 };
628 eq_properties.add_equal_conditions(Arc::clone(&col_b), Arc::clone(&col_a))?;
630 eq_properties.add_orderings([
632 vec![PhysicalSortExpr::new(Arc::clone(&col_b), option_asc)],
633 vec![PhysicalSortExpr::new(Arc::clone(&col_d), option_asc)],
634 ]);
635
636 let test_cases = vec![
637 (
639 Arc::new(BinaryExpr::new(col_d, Operator::Plus, Arc::clone(&col_b))) as _,
640 SortProperties::Ordered(option_asc),
641 ),
642 (col_b, SortProperties::Ordered(option_asc)),
644 (Arc::clone(&col_a), SortProperties::Ordered(option_asc)),
646 (
648 Arc::new(BinaryExpr::new(col_a, Operator::Plus, col_c)),
649 SortProperties::Unordered,
650 ),
651 ];
652 for (expr, expected) in test_cases {
653 let leading_orderings = eq_properties
654 .oeq_class()
655 .iter()
656 .map(|ordering| ordering.first().clone())
657 .collect::<Vec<_>>();
658 let expr_props = eq_properties.get_expr_properties(Arc::clone(&expr));
659 let err_msg = format!(
660 "expr:{:?}, expected: {:?}, actual: {:?}, leading_orderings: {leading_orderings:?}",
661 expr, expected, expr_props.sort_properties
662 );
663 assert_eq!(expr_props.sort_properties, expected, "{err_msg}");
664 }
665
666 Ok(())
667 }
668
669 #[test]
670 fn test_find_longest_permutation() -> Result<()> {
671 let (test_schema, mut eq_properties) = create_test_params()?;
677 let col_a = &col("a", &test_schema)?;
678 let col_b = &col("b", &test_schema)?;
679 let col_c = &col("c", &test_schema)?;
680 let col_d = &col("d", &test_schema)?;
681 let col_e = &col("e", &test_schema)?;
682 let col_f = &col("f", &test_schema)?;
683 let col_h = &col("h", &test_schema)?;
684 let a_plus_d = Arc::new(BinaryExpr::new(
686 Arc::clone(col_a),
687 Operator::Plus,
688 Arc::clone(col_d),
689 )) as _;
690
691 let option_asc = SortOptions {
692 descending: false,
693 nulls_first: false,
694 };
695 let option_desc = SortOptions {
696 descending: true,
697 nulls_first: true,
698 };
699 eq_properties.add_ordering([
701 PhysicalSortExpr::new(Arc::clone(col_d), option_asc),
702 PhysicalSortExpr::new(Arc::clone(col_h), option_desc),
703 ]);
704 let test_cases = vec![
705 (vec![col_a], vec![(col_a, option_asc)]),
707 (vec![col_c], vec![(col_c, option_asc)]),
709 (
711 vec![col_d, col_e, col_b],
712 vec![
713 (col_d, option_asc),
714 (col_e, option_desc),
715 (col_b, option_asc),
716 ],
717 ),
718 (vec![col_b], vec![]),
720 (vec![col_d], vec![(col_d, option_asc)]),
722 (vec![&a_plus_d], vec![(&a_plus_d, option_asc)]),
724 (
726 vec![col_b, col_d],
727 vec![(col_d, option_asc), (col_b, option_asc)],
728 ),
729 (
731 vec![col_c, col_e],
732 vec![(col_c, option_asc), (col_e, option_desc)],
733 ),
734 (
736 vec![col_d, col_h, col_e, col_f, col_b],
737 vec![
738 (col_d, option_asc),
739 (col_e, option_desc),
740 (col_h, option_desc),
741 (col_f, option_asc),
742 (col_b, option_asc),
743 ],
744 ),
745 (
747 vec![col_e, col_d, col_h, col_f, col_b],
748 vec![
749 (col_e, option_desc),
750 (col_d, option_asc),
751 (col_h, option_desc),
752 (col_f, option_asc),
753 (col_b, option_asc),
754 ],
755 ),
756 (
758 vec![col_e, col_d, col_b, col_h, col_f],
759 vec![
760 (col_e, option_desc),
761 (col_d, option_asc),
762 (col_b, option_asc),
763 (col_h, option_desc),
764 (col_f, option_asc),
765 ],
766 ),
767 ];
768 for (exprs, expected) in test_cases {
769 let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
770 let expected = convert_to_sort_exprs(&expected);
771 let (actual, _) = eq_properties.find_longest_permutation(&exprs)?;
772 assert_eq!(actual, expected);
773 }
774
775 Ok(())
776 }
777
778 #[test]
779 fn test_find_longest_permutation2() -> Result<()> {
780 let (test_schema, mut eq_properties) = create_test_params()?;
786 let col_h = &col("h", &test_schema)?;
787
788 eq_properties.add_constants(vec![ConstExpr::from(Arc::clone(col_h))])?;
790
791 let test_cases = vec![
792 (vec![col_h], vec![(col_h, SortOptions::default())]),
796 ];
797 for (exprs, expected) in test_cases {
798 let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
799 let expected = convert_to_sort_exprs(&expected);
800 let (actual, _) = eq_properties.find_longest_permutation(&exprs)?;
801 assert_eq!(actual, expected);
802 }
803
804 Ok(())
805 }
806
807 #[test]
808 fn test_normalize_sort_reqs() -> Result<()> {
809 let (test_schema, eq_properties) = create_test_params()?;
814 let col_a = &col("a", &test_schema)?;
815 let col_b = &col("b", &test_schema)?;
816 let col_c = &col("c", &test_schema)?;
817 let col_d = &col("d", &test_schema)?;
818 let col_e = &col("e", &test_schema)?;
819 let col_f = &col("f", &test_schema)?;
820 let option_asc = SortOptions {
821 descending: false,
822 nulls_first: false,
823 };
824 let option_desc = SortOptions {
825 descending: true,
826 nulls_first: true,
827 };
828 let requirements = vec![
830 (
831 vec![(col_a, Some(option_asc))],
832 vec![(col_a, Some(option_asc))],
833 ),
834 (
835 vec![(col_a, Some(option_desc))],
836 vec![(col_a, Some(option_desc))],
837 ),
838 (vec![(col_a, None)], vec![(col_a, None)]),
839 (
841 vec![(col_c, Some(option_asc))],
842 vec![(col_a, Some(option_asc))],
843 ),
844 (vec![(col_c, None)], vec![(col_a, None)]),
845 (
847 vec![(col_d, Some(option_asc)), (col_b, Some(option_asc))],
848 vec![(col_d, Some(option_asc)), (col_b, Some(option_asc))],
849 ),
850 (
851 vec![(col_d, None), (col_b, None)],
852 vec![(col_d, None), (col_b, None)],
853 ),
854 (
855 vec![(col_e, Some(option_desc)), (col_f, Some(option_asc))],
856 vec![(col_e, Some(option_desc)), (col_f, Some(option_asc))],
857 ),
858 (
860 vec![(col_e, Some(option_desc)), (col_f, None)],
861 vec![(col_e, Some(option_desc)), (col_f, None)],
862 ),
863 (
864 vec![(col_e, None), (col_f, None)],
865 vec![(col_e, None), (col_f, None)],
866 ),
867 ];
868
869 for (reqs, expected_normalized) in requirements.into_iter() {
870 let req = convert_to_sort_reqs(&reqs);
871 let expected_normalized = convert_to_sort_reqs(&expected_normalized);
872
873 assert_eq!(
874 eq_properties.normalize_sort_requirements(req).unwrap(),
875 expected_normalized
876 );
877 }
878
879 Ok(())
880 }
881
882 #[test]
883 fn test_schema_normalize_sort_requirement_with_equivalence() -> Result<()> {
884 let option1 = SortOptions {
885 descending: false,
886 nulls_first: false,
887 };
888 let (test_schema, eq_properties) = create_test_params()?;
890 let col_a = &col("a", &test_schema)?;
891 let col_c = &col("c", &test_schema)?;
892 let col_d = &col("d", &test_schema)?;
893
894 let test_cases = vec![
898 (vec![(col_a, Some(option1))], vec![(col_a, Some(option1))]),
899 (vec![(col_c, Some(option1))], vec![(col_a, Some(option1))]),
901 (vec![(col_c, None)], vec![(col_a, None)]),
902 (vec![(col_d, Some(option1))], vec![(col_d, Some(option1))]),
903 ];
904 for (reqs, expected) in test_cases.into_iter() {
905 let reqs = convert_to_sort_reqs(&reqs);
906 let expected = convert_to_sort_reqs(&expected);
907 let normalized = eq_properties
908 .normalize_sort_requirements(reqs.clone())
909 .unwrap();
910 assert!(
911 expected.eq(&normalized),
912 "error in test: reqs: {reqs:?}, expected: {expected:?}, normalized: {normalized:?}"
913 );
914 }
915
916 Ok(())
917 }
918
919 #[test]
920 fn test_eliminate_redundant_monotonic_sorts() -> Result<()> {
921 let schema = Arc::new(Schema::new(vec![
922 Field::new("a", DataType::Date32, true),
923 Field::new("b", DataType::Utf8, true),
924 Field::new("c", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
925 ]));
926 let mut base_properties = EquivalenceProperties::new(Arc::clone(&schema));
927 base_properties.reorder(
928 ["a", "b", "c"]
929 .into_iter()
930 .map(|c| PhysicalSortExpr::new_default(col(c, schema.as_ref()).unwrap())),
931 )?;
932
933 struct TestCase {
934 name: &'static str,
935 constants: Vec<Arc<dyn PhysicalExpr>>,
936 equal_conditions: Vec<[Arc<dyn PhysicalExpr>; 2]>,
937 sort_columns: &'static [&'static str],
938 should_satisfy_ordering: bool,
939 }
940
941 let col_a = col("a", schema.as_ref())?;
942 let col_b = col("b", schema.as_ref())?;
943 let col_c = col("c", schema.as_ref())?;
944 let cast_c = Arc::new(CastExpr::new(col_c, DataType::Date32, None)) as _;
945
946 let cases = vec![
947 TestCase {
948 name: "(a, b, c) -> (c)",
949 constants: vec![Arc::clone(&col_b)],
951 equal_conditions: vec![[Arc::clone(&cast_c), Arc::clone(&col_a)]],
952 sort_columns: &["c"],
953 should_satisfy_ordering: true,
954 },
955 TestCase {
958 name: "(a, b, c) -> (c)",
959 constants: vec![col_b],
961 equal_conditions: vec![[Arc::clone(&col_a), Arc::clone(&cast_c)]],
962 sort_columns: &["c"],
963 should_satisfy_ordering: true,
964 },
965 TestCase {
966 name: "not ordered because (b) is not constant",
967 constants: vec![],
969 equal_conditions: vec![[Arc::clone(&cast_c), Arc::clone(&col_a)]],
971 sort_columns: &["c"],
972 should_satisfy_ordering: false,
973 },
974 ];
975
976 for case in cases {
977 for properties in [
981 {
983 let mut properties = base_properties.clone();
984 for [left, right] in case.equal_conditions.clone() {
985 properties.add_equal_conditions(left, right)?
986 }
987 properties.add_constants(
988 case.constants.iter().cloned().map(ConstExpr::from),
989 )?;
990 properties
991 },
992 {
994 let mut properties = base_properties.clone();
995 properties.add_constants(
996 case.constants.iter().cloned().map(ConstExpr::from),
997 )?;
998 for [left, right] in case.equal_conditions {
999 properties.add_equal_conditions(left, right)?
1000 }
1001 properties
1002 },
1003 ] {
1004 let sort = case
1005 .sort_columns
1006 .iter()
1007 .map(|&name| col(name, &schema).map(PhysicalSortExpr::new_default))
1008 .collect::<Result<Vec<_>>>()?;
1009
1010 assert_eq!(
1011 properties.ordering_satisfy(sort)?,
1012 case.should_satisfy_ordering,
1013 "failed test '{}'",
1014 case.name
1015 );
1016 }
1017 }
1018
1019 Ok(())
1020 }
1021
1022 #[test]
1023 fn test_ordering_equivalence_with_lex_monotonic_concat() -> Result<()> {
1024 let schema = Arc::new(Schema::new(vec![
1025 Field::new("a", DataType::Utf8, false),
1026 Field::new("b", DataType::Utf8, false),
1027 Field::new("c", DataType::Utf8, false),
1028 ]));
1029
1030 let col_a = col("a", &schema)?;
1031 let col_b = col("b", &schema)?;
1032 let col_c = col("c", &schema)?;
1033
1034 let a_concat_b: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
1035 "concat",
1036 concat(),
1037 vec![Arc::clone(&col_a), Arc::clone(&col_b)],
1038 Field::new("f", DataType::Utf8, true).into(),
1039 Arc::new(ConfigOptions::default()),
1040 ));
1041
1042 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1044
1045 eq_properties.add_ordering([
1046 PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc(),
1047 PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
1048 PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
1049 ]);
1050
1051 eq_properties.add_equal_conditions(Arc::clone(&col_c), a_concat_b)?;
1053
1054 let orderings = eq_properties.oeq_class();
1055
1056 let expected_ordering1 = [PhysicalSortExpr::new_default(col_c).asc()].into();
1057 let expected_ordering2 = [
1058 PhysicalSortExpr::new_default(col_a).asc(),
1059 PhysicalSortExpr::new_default(col_b).asc(),
1060 ]
1061 .into();
1062
1063 assert_eq!(orderings.len(), 2);
1065 assert!(orderings.contains(&expected_ordering1));
1066 assert!(orderings.contains(&expected_ordering2));
1067
1068 Ok(())
1069 }
1070
1071 #[test]
1072 fn test_ordering_equivalence_with_non_lex_monotonic_multiply() -> Result<()> {
1073 let schema = Arc::new(Schema::new(vec![
1074 Field::new("a", DataType::Int32, false),
1075 Field::new("b", DataType::Int32, false),
1076 Field::new("c", DataType::Int32, false),
1077 ]));
1078
1079 let col_a = col("a", &schema)?;
1080 let col_b = col("b", &schema)?;
1081 let col_c = col("c", &schema)?;
1082
1083 let a_times_b = Arc::new(BinaryExpr::new(
1084 Arc::clone(&col_a),
1085 Operator::Multiply,
1086 Arc::clone(&col_b),
1087 )) as _;
1088
1089 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1091
1092 let initial_ordering: LexOrdering = [
1093 PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc(),
1094 PhysicalSortExpr::new_default(col_a).asc(),
1095 PhysicalSortExpr::new_default(col_b).asc(),
1096 ]
1097 .into();
1098
1099 eq_properties.add_ordering(initial_ordering.clone());
1100
1101 eq_properties.add_equal_conditions(col_c, a_times_b)?;
1103
1104 let orderings = eq_properties.oeq_class();
1105
1106 assert_eq!(orderings.len(), 1);
1108 assert!(orderings.contains(&initial_ordering));
1109
1110 Ok(())
1111 }
1112
1113 #[test]
1114 fn test_ordering_equivalence_with_concat_equality() -> Result<()> {
1115 let schema = Arc::new(Schema::new(vec![
1116 Field::new("a", DataType::Utf8, false),
1117 Field::new("b", DataType::Utf8, false),
1118 Field::new("c", DataType::Utf8, false),
1119 ]));
1120
1121 let col_a = col("a", &schema)?;
1122 let col_b = col("b", &schema)?;
1123 let col_c = col("c", &schema)?;
1124
1125 let a_concat_b = Arc::new(ScalarFunctionExpr::new(
1126 "concat",
1127 concat(),
1128 vec![Arc::clone(&col_a), Arc::clone(&col_b)],
1129 Field::new("f", DataType::Utf8, true).into(),
1130 Arc::new(ConfigOptions::default()),
1131 )) as _;
1132
1133 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1135
1136 eq_properties.add_ordering([
1137 PhysicalSortExpr::new_default(Arc::clone(&a_concat_b)).asc(),
1138 PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(),
1139 PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(),
1140 ]);
1141
1142 eq_properties.add_equal_conditions(col_c, Arc::clone(&a_concat_b))?;
1144
1145 let orderings = eq_properties.oeq_class();
1146
1147 let expected_ordering1 = [PhysicalSortExpr::new_default(a_concat_b).asc()].into();
1148 let expected_ordering2 = [
1149 PhysicalSortExpr::new_default(col_a).asc(),
1150 PhysicalSortExpr::new_default(col_b).asc(),
1151 ]
1152 .into();
1153
1154 assert_eq!(orderings.len(), 2);
1156 assert!(orderings.contains(&expected_ordering1));
1157 assert!(orderings.contains(&expected_ordering2));
1158
1159 Ok(())
1160 }
1161
1162 #[test]
1163 fn test_requirements_compatible() -> Result<()> {
1164 let schema = Arc::new(Schema::new(vec![
1165 Field::new("a", DataType::Int32, true),
1166 Field::new("b", DataType::Int32, true),
1167 Field::new("c", DataType::Int32, true),
1168 ]));
1169 let col_a = col("a", &schema)?;
1170 let col_b = col("b", &schema)?;
1171 let col_c = col("c", &schema)?;
1172
1173 let eq_properties = EquivalenceProperties::new(schema);
1174 let lex_a: LexRequirement =
1175 [PhysicalSortRequirement::new(Arc::clone(&col_a), None)].into();
1176 let lex_a_b: LexRequirement = [
1177 PhysicalSortRequirement::new(col_a, None),
1178 PhysicalSortRequirement::new(col_b, None),
1179 ]
1180 .into();
1181 let lex_c = [PhysicalSortRequirement::new(col_c, None)].into();
1182
1183 assert!(eq_properties.requirements_compatible(lex_a.clone(), lex_a.clone()));
1184 assert!(!eq_properties.requirements_compatible(lex_a.clone(), lex_a_b.clone()));
1185 assert!(eq_properties.requirements_compatible(lex_a_b, lex_a.clone()));
1186 assert!(!eq_properties.requirements_compatible(lex_c, lex_a));
1187
1188 Ok(())
1189 }
1190
1191 #[test]
1192 fn test_with_reorder_constant_filtering() -> Result<()> {
1193 let schema = create_test_schema()?;
1194 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1195
1196 let col_a = col("a", &schema)?;
1198 let col_b = col("b", &schema)?;
1199 eq_properties.add_constants([ConstExpr::from(Arc::clone(&col_a))])?;
1200
1201 let sort_exprs = vec![
1202 PhysicalSortExpr::new_default(Arc::clone(&col_a)),
1203 PhysicalSortExpr::new_default(Arc::clone(&col_b)),
1204 ];
1205
1206 let change = eq_properties.reorder(sort_exprs)?;
1207 assert!(change);
1208
1209 assert_eq!(eq_properties.oeq_class().len(), 1);
1210 let ordering = eq_properties.oeq_class().iter().next().unwrap();
1211 assert_eq!(ordering.len(), 2);
1212 assert!(ordering[0].expr.eq(&col_a));
1213 assert!(ordering[1].expr.eq(&col_b));
1214
1215 Ok(())
1216 }
1217
1218 #[test]
1219 fn test_with_reorder_preserve_suffix() -> Result<()> {
1220 let schema = create_test_schema()?;
1221 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1222
1223 let col_a = col("a", &schema)?;
1224 let col_b = col("b", &schema)?;
1225 let col_c = col("c", &schema)?;
1226
1227 let asc = SortOptions::default();
1228 let desc = SortOptions {
1229 descending: true,
1230 nulls_first: true,
1231 };
1232
1233 eq_properties.add_ordering([
1235 PhysicalSortExpr::new(Arc::clone(&col_a), asc),
1236 PhysicalSortExpr::new(Arc::clone(&col_b), desc),
1237 PhysicalSortExpr::new(Arc::clone(&col_c), asc),
1238 ]);
1239
1240 let new_order = vec![PhysicalSortExpr::new(Arc::clone(&col_a), asc)];
1242
1243 let change = eq_properties.reorder(new_order)?;
1244 assert!(!change);
1245
1246 assert_eq!(eq_properties.oeq_class().len(), 1);
1248 let ordering = eq_properties.oeq_class().iter().next().unwrap();
1249 assert_eq!(ordering.len(), 3);
1250 assert!(ordering[0].expr.eq(&col_a));
1251 assert!(ordering[0].options.eq(&asc));
1252 assert!(ordering[1].expr.eq(&col_b));
1253 assert!(ordering[1].options.eq(&desc));
1254 assert!(ordering[2].expr.eq(&col_c));
1255 assert!(ordering[2].options.eq(&asc));
1256
1257 Ok(())
1258 }
1259
1260 #[test]
1261 fn test_with_reorder_equivalent_expressions() -> Result<()> {
1262 let schema = create_test_schema()?;
1263 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1264
1265 let col_a = col("a", &schema)?;
1266 let col_b = col("b", &schema)?;
1267 let col_c = col("c", &schema)?;
1268
1269 eq_properties.add_equal_conditions(Arc::clone(&col_a), Arc::clone(&col_b))?;
1271
1272 eq_properties.add_ordering([
1274 PhysicalSortExpr::new_default(Arc::clone(&col_a)),
1275 PhysicalSortExpr::new_default(Arc::clone(&col_c)),
1276 ]);
1277
1278 let new_order = vec![PhysicalSortExpr::new_default(Arc::clone(&col_b))];
1280
1281 let change = eq_properties.reorder(new_order)?;
1282
1283 assert!(!change);
1284 assert_eq!(eq_properties.oeq_class().len(), 1);
1286
1287 let asc = SortOptions::default();
1289 let ordering = eq_properties.oeq_class().iter().next().unwrap();
1290 assert_eq!(ordering.len(), 2);
1291 assert!(ordering[0].expr.eq(&col_a) || ordering[0].expr.eq(&col_b));
1292 assert!(ordering[0].options.eq(&asc));
1293 assert!(ordering[1].expr.eq(&col_c));
1294 assert!(ordering[1].options.eq(&asc));
1295
1296 Ok(())
1297 }
1298
1299 #[test]
1300 fn test_with_reorder_incompatible_prefix() -> Result<()> {
1301 let schema = create_test_schema()?;
1302 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1303
1304 let col_a = col("a", &schema)?;
1305 let col_b = col("b", &schema)?;
1306
1307 let asc = SortOptions::default();
1308 let desc = SortOptions {
1309 descending: true,
1310 nulls_first: true,
1311 };
1312
1313 eq_properties.add_ordering([
1315 PhysicalSortExpr::new(Arc::clone(&col_a), asc),
1316 PhysicalSortExpr::new(Arc::clone(&col_b), desc),
1317 ]);
1318
1319 let new_order = vec![PhysicalSortExpr::new(Arc::clone(&col_a), desc)];
1321
1322 let change = eq_properties.reorder(new_order.clone())?;
1323
1324 assert!(change);
1325 assert_eq!(eq_properties.oeq_class().len(), 1);
1327 let ordering = eq_properties.oeq_class().iter().next().unwrap();
1328 assert_eq!(ordering.to_vec(), new_order);
1329
1330 Ok(())
1331 }
1332
1333 #[test]
1334 fn test_with_reorder_comprehensive() -> Result<()> {
1335 let schema = create_test_schema()?;
1336 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1337
1338 let col_a = col("a", &schema)?;
1339 let col_b = col("b", &schema)?;
1340 let col_c = col("c", &schema)?;
1341 let col_d = col("d", &schema)?;
1342 let col_e = col("e", &schema)?;
1343
1344 eq_properties.add_constants([ConstExpr::from(Arc::clone(&col_c))])?;
1346
1347 eq_properties.add_equal_conditions(Arc::clone(&col_b), Arc::clone(&col_d))?;
1349
1350 eq_properties.add_orderings([
1352 vec![
1353 PhysicalSortExpr::new_default(Arc::clone(&col_d)),
1354 PhysicalSortExpr::new_default(Arc::clone(&col_a)),
1355 ],
1356 vec![PhysicalSortExpr::new_default(Arc::clone(&col_e))],
1357 ]);
1358
1359 let new_order = vec![
1361 PhysicalSortExpr::new_default(Arc::clone(&col_b)),
1362 PhysicalSortExpr::new_default(Arc::clone(&col_c)),
1363 ];
1364
1365 let old_orderings = eq_properties.oeq_class().clone();
1366 let change = eq_properties.reorder(new_order)?;
1367 assert!(!change);
1369 assert_eq!(eq_properties.oeq_class, old_orderings);
1370
1371 Ok(())
1372 }
1373
1374 #[test]
1375 fn test_ordering_satisfaction_with_key_constraints() -> Result<()> {
1376 let pk_schema = Arc::new(Schema::new(vec![
1377 Field::new("a", DataType::Int32, true),
1378 Field::new("b", DataType::Int32, true),
1379 Field::new("c", DataType::Int32, true),
1380 Field::new("d", DataType::Int32, true),
1381 ]));
1382
1383 let unique_schema = Arc::new(Schema::new(vec![
1384 Field::new("a", DataType::Int32, false),
1385 Field::new("b", DataType::Int32, false),
1386 Field::new("c", DataType::Int32, true),
1387 Field::new("d", DataType::Int32, true),
1388 ]));
1389
1390 let test_cases = vec![
1392 (
1394 "single column primary key",
1395 &pk_schema,
1396 vec![Constraint::PrimaryKey(vec![0])],
1397 vec!["a"], vec![vec!["a", "b"], vec!["a", "c", "d"]],
1399 vec![vec!["b", "a"], vec!["c", "a"]],
1400 ),
1401 (
1402 "single column unique",
1403 &unique_schema,
1404 vec![Constraint::Unique(vec![0])],
1405 vec!["a"], vec![vec!["a", "b"], vec!["a", "c", "d"]],
1407 vec![vec!["b", "a"], vec!["c", "a"]],
1408 ),
1409 (
1410 "multi-column primary key",
1411 &pk_schema,
1412 vec![Constraint::PrimaryKey(vec![0, 1])],
1413 vec!["a", "b"], vec![vec!["a", "b", "c"], vec!["a", "b", "d"]],
1415 vec![vec!["b", "a"], vec!["a", "c", "b"]],
1416 ),
1417 (
1418 "multi-column unique",
1419 &unique_schema,
1420 vec![Constraint::Unique(vec![0, 1])],
1421 vec!["a", "b"], vec![vec!["a", "b", "c"], vec!["a", "b", "d"]],
1423 vec![vec!["b", "a"], vec!["c", "a", "b"]],
1424 ),
1425 (
1426 "nullable unique",
1427 &unique_schema,
1428 vec![Constraint::Unique(vec![2, 3])],
1429 vec!["c", "d"], vec![],
1431 vec![vec!["c", "d", "a"]],
1432 ),
1433 (
1434 "ordering with arbitrary column unique",
1435 &unique_schema,
1436 vec![Constraint::Unique(vec![0, 1])],
1437 vec!["a", "c", "b"], vec![vec!["a", "c", "b", "d"]],
1439 vec![vec!["a", "b", "d"]],
1440 ),
1441 (
1442 "ordering with arbitrary column pk",
1443 &pk_schema,
1444 vec![Constraint::PrimaryKey(vec![0, 1])],
1445 vec!["a", "c", "b"], vec![vec!["a", "c", "b", "d"]],
1447 vec![vec!["a", "b", "d"]],
1448 ),
1449 (
1450 "ordering with arbitrary column pk complex",
1451 &pk_schema,
1452 vec![Constraint::PrimaryKey(vec![3, 1])],
1453 vec!["b", "a", "d"], vec![vec!["b", "a", "d", "c"]],
1455 vec![vec!["b", "c", "d", "a"], vec!["b", "a", "c", "d"]],
1456 ),
1457 ];
1458
1459 for (
1460 name,
1461 schema,
1462 constraints,
1463 base_order,
1464 satisfied_orders,
1465 unsatisfied_orders,
1466 ) in test_cases
1467 {
1468 let mut eq_properties = EquivalenceProperties::new(Arc::clone(schema));
1469
1470 let satisfied_orderings: Vec<_> = satisfied_orders
1472 .iter()
1473 .map(|cols| {
1474 cols.iter()
1475 .map(|col_name| {
1476 PhysicalSortExpr::new_default(col(col_name, schema).unwrap())
1477 })
1478 .collect::<Vec<_>>()
1479 })
1480 .collect();
1481
1482 let unsatisfied_orderings: Vec<_> = unsatisfied_orders
1483 .iter()
1484 .map(|cols| {
1485 cols.iter()
1486 .map(|col_name| {
1487 PhysicalSortExpr::new_default(col(col_name, schema).unwrap())
1488 })
1489 .collect::<Vec<_>>()
1490 })
1491 .collect();
1492
1493 for ordering in satisfied_orderings.clone() {
1495 let err_msg = format!(
1496 "{name}: ordering {ordering:?} should not be satisfied before adding constraints",
1497 );
1498 assert!(!eq_properties.ordering_satisfy(ordering)?, "{err_msg}");
1499 }
1500
1501 let base_ordering = base_order.iter().map(|col_name| PhysicalSortExpr {
1503 expr: col(col_name, schema).unwrap(),
1504 options: SortOptions::default(),
1505 });
1506 eq_properties.add_ordering(base_ordering);
1507
1508 eq_properties =
1510 eq_properties.with_constraints(Constraints::new_unverified(constraints));
1511
1512 for ordering in satisfied_orderings {
1514 let err_msg = format!(
1515 "{name}: ordering {ordering:?} should be satisfied after adding constraints",
1516 );
1517 assert!(eq_properties.ordering_satisfy(ordering)?, "{err_msg}");
1518 }
1519
1520 for ordering in unsatisfied_orderings {
1522 let err_msg = format!(
1523 "{name}: ordering {ordering:?} should not be satisfied after adding constraints",
1524 );
1525 assert!(!eq_properties.ordering_satisfy(ordering)?, "{err_msg}");
1526 }
1527 }
1528
1529 Ok(())
1530 }
1531}