1use std::ops::Deref;
19use std::sync::Arc;
20
21use crate::expressions::Column;
22use crate::utils::collect_columns;
23use crate::{PhysicalExpr, PhysicalExprExt};
24
25use arrow::datatypes::{Field, Schema, SchemaRef};
26use datafusion_common::stats::{ColumnStatistics, Precision};
27use datafusion_common::tree_node::{Transformed, TransformedResult};
28use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
29
30use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
31use indexmap::IndexMap;
32use itertools::Itertools;
33
34#[derive(Debug, Clone)]
43pub struct ProjectionExpr {
44 pub expr: Arc<dyn PhysicalExpr>,
46 pub alias: String,
48}
49
50impl std::fmt::Display for ProjectionExpr {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 if self.expr.to_string() == self.alias {
53 write!(f, "{}", self.alias)
54 } else {
55 write!(f, "{} AS {}", self.expr, self.alias)
56 }
57 }
58}
59
60impl ProjectionExpr {
61 pub fn new(expr: Arc<dyn PhysicalExpr>, alias: String) -> Self {
63 Self { expr, alias }
64 }
65
66 pub fn new_from_expression(
68 expr: Arc<dyn PhysicalExpr>,
69 schema: &Schema,
70 ) -> Result<Self> {
71 let field = expr.return_field(schema)?;
72 Ok(Self {
73 expr,
74 alias: field.name().to_string(),
75 })
76 }
77}
78
79impl From<(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
80 fn from(value: (Arc<dyn PhysicalExpr>, String)) -> Self {
81 Self::new(value.0, value.1)
82 }
83}
84
85impl From<&(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
86 fn from(value: &(Arc<dyn PhysicalExpr>, String)) -> Self {
87 Self::new(Arc::clone(&value.0), value.1.clone())
88 }
89}
90
91impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
92 fn from(value: ProjectionExpr) -> Self {
93 (value.expr, value.alias)
94 }
95}
96
97#[derive(Debug, Clone)]
103pub struct ProjectionExprs {
104 exprs: Vec<ProjectionExpr>,
105}
106
107impl std::fmt::Display for ProjectionExprs {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 let exprs: Vec<String> = self.exprs.iter().map(|e| e.to_string()).collect();
110 write!(f, "Projection[{}]", exprs.join(", "))
111 }
112}
113
114impl From<Vec<ProjectionExpr>> for ProjectionExprs {
115 fn from(value: Vec<ProjectionExpr>) -> Self {
116 Self { exprs: value }
117 }
118}
119
120impl From<&[ProjectionExpr]> for ProjectionExprs {
121 fn from(value: &[ProjectionExpr]) -> Self {
122 Self {
123 exprs: value.to_vec(),
124 }
125 }
126}
127
128impl FromIterator<ProjectionExpr> for ProjectionExprs {
129 fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self {
130 Self {
131 exprs: exprs.into_iter().collect::<Vec<_>>(),
132 }
133 }
134}
135
136impl AsRef<[ProjectionExpr]> for ProjectionExprs {
137 fn as_ref(&self) -> &[ProjectionExpr] {
138 &self.exprs
139 }
140}
141
142impl ProjectionExprs {
143 pub fn new<I>(exprs: I) -> Self
144 where
145 I: IntoIterator<Item = ProjectionExpr>,
146 {
147 Self {
148 exprs: exprs.into_iter().collect::<Vec<_>>(),
149 }
150 }
151
152 pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Self {
196 let projection_exprs = indices.iter().map(|&i| {
197 let field = schema.field(i);
198 ProjectionExpr {
199 expr: Arc::new(Column::new(field.name(), i)),
200 alias: field.name().clone(),
201 }
202 });
203
204 Self::from_iter(projection_exprs)
205 }
206
207 pub fn iter(&self) -> impl Iterator<Item = &ProjectionExpr> {
209 self.exprs.iter()
210 }
211
212 pub fn projection_mapping(
214 &self,
215 input_schema: &SchemaRef,
216 ) -> Result<ProjectionMapping> {
217 ProjectionMapping::try_new(
218 self.exprs
219 .iter()
220 .map(|p| (Arc::clone(&p.expr), p.alias.clone())),
221 input_schema,
222 )
223 }
224
225 pub fn expr_iter(&self) -> impl Iterator<Item = Arc<dyn PhysicalExpr>> + '_ {
227 self.exprs.iter().map(|e| Arc::clone(&e.expr))
228 }
229
230 pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> {
296 let mut new_exprs = Vec::with_capacity(other.exprs.len());
297 for proj_expr in &other.exprs {
298 let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)?
299 .ok_or_else(|| {
300 internal_datafusion_err!(
301 "Failed to combine projections: expression {} could not be applied on top of existing projections {}",
302 proj_expr.expr,
303 self.exprs.iter().map(|e| format!("{e}")).join(", ")
304 )
305 })?;
306 new_exprs.push(ProjectionExpr {
307 expr: new_expr,
308 alias: proj_expr.alias.clone(),
309 });
310 }
311 Ok(ProjectionExprs::new(new_exprs))
312 }
313
314 pub fn column_indices(&self) -> Vec<usize> {
319 self.exprs
320 .iter()
321 .flat_map(|e| collect_columns(&e.expr).into_iter().map(|col| col.index()))
322 .sorted_unstable()
323 .dedup()
324 .collect_vec()
325 }
326
327 pub fn ordered_column_indices(&self) -> Vec<usize> {
355 self.exprs
356 .iter()
357 .map(|e| {
358 e.expr
359 .as_any()
360 .downcast_ref::<Column>()
361 .expect("Expected column reference in projection")
362 .index()
363 })
364 .collect()
365 }
366
367 pub fn project_schema(&self, input_schema: &Schema) -> Result<Schema> {
372 let fields: Result<Vec<Field>> = self
373 .exprs
374 .iter()
375 .map(|proj_expr| {
376 let metadata = proj_expr
377 .expr
378 .return_field(input_schema)?
379 .metadata()
380 .clone();
381
382 let field = Field::new(
383 &proj_expr.alias,
384 proj_expr.expr.data_type(input_schema)?,
385 proj_expr.expr.nullable(input_schema)?,
386 )
387 .with_metadata(metadata);
388
389 Ok(field)
390 })
391 .collect();
392
393 Ok(Schema::new_with_metadata(
394 fields?,
395 input_schema.metadata().clone(),
396 ))
397 }
398
399 pub fn project_statistics(
403 &self,
404 mut stats: datafusion_common::Statistics,
405 input_schema: &Schema,
406 ) -> Result<datafusion_common::Statistics> {
407 let mut primitive_row_size = 0;
408 let mut primitive_row_size_possible = true;
409 let mut column_statistics = vec![];
410
411 for proj_expr in &self.exprs {
412 let expr = &proj_expr.expr;
413 let col_stats = if let Some(col) = expr.as_any().downcast_ref::<Column>() {
414 stats.column_statistics[col.index()].clone()
415 } else {
416 ColumnStatistics::new_unknown()
419 };
420 column_statistics.push(col_stats);
421 let data_type = expr.data_type(input_schema)?;
422 if let Some(value) = data_type.primitive_width() {
423 primitive_row_size += value;
424 continue;
425 }
426 primitive_row_size_possible = false;
427 }
428
429 if primitive_row_size_possible {
430 stats.total_byte_size =
431 Precision::Exact(primitive_row_size).multiply(&stats.num_rows);
432 }
433 stats.column_statistics = column_statistics;
434 Ok(stats)
435 }
436}
437
438impl<'a> IntoIterator for &'a ProjectionExprs {
439 type Item = &'a ProjectionExpr;
440 type IntoIter = std::slice::Iter<'a, ProjectionExpr>;
441
442 fn into_iter(self) -> Self::IntoIter {
443 self.exprs.iter()
444 }
445}
446
447impl IntoIterator for ProjectionExprs {
448 type Item = ProjectionExpr;
449 type IntoIter = std::vec::IntoIter<ProjectionExpr>;
450
451 fn into_iter(self) -> Self::IntoIter {
452 self.exprs.into_iter()
453 }
454}
455
456pub fn update_expr(
484 expr: &Arc<dyn PhysicalExpr>,
485 projected_exprs: &[ProjectionExpr],
486 sync_with_child: bool,
487) -> Result<Option<Arc<dyn PhysicalExpr>>> {
488 #[derive(Debug, PartialEq)]
489 enum RewriteState {
490 Unchanged,
492 RewrittenValid,
494 RewrittenInvalid,
497 }
498
499 let mut state = RewriteState::Unchanged;
500
501 let new_expr = Arc::clone(expr)
502 .transform_up_with_lambdas_params(|expr, lambdas_params| {
503 if state == RewriteState::RewrittenInvalid {
504 return Ok(Transformed::no(expr));
505 }
506
507 let column = match expr.as_any().downcast_ref::<Column>() {
508 Some(column) if !lambdas_params.contains(column.name()) => column,
509 _ => {
510 return Ok(Transformed::no(expr));
511 }
512 };
513 if sync_with_child {
514 state = RewriteState::RewrittenValid;
515 let projected_expr = projected_exprs.get(column.index()).ok_or_else(|| {
517 internal_datafusion_err!(
518 "Column index {} out of bounds for projected expressions of length {}",
519 column.index(),
520 projected_exprs.len()
521 )
522 })?;
523 Ok(Transformed::yes(Arc::clone(&projected_expr.expr)))
524 } else {
525 state = RewriteState::RewrittenInvalid;
527 projected_exprs
529 .iter()
530 .enumerate()
531 .find_map(|(index, proj_expr)| {
532 proj_expr.expr.as_any().downcast_ref::<Column>().and_then(
533 |projected_column| {
534 (column.name().eq(projected_column.name())
535 && column.index() == projected_column.index())
536 .then(|| {
537 state = RewriteState::RewrittenValid;
538 Arc::new(Column::new(&proj_expr.alias, index)) as _
539 })
540 },
541 )
542 })
543 .map_or_else(
544 || Ok(Transformed::no(expr)),
545 |c| Ok(Transformed::yes(c)),
546 )
547 }
548 })
549 .data()?;
550
551 Ok((state == RewriteState::RewrittenValid).then_some(new_expr))
552}
553
554#[derive(Clone, Debug, Default)]
557pub struct ProjectionTargets {
558 exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>,
562}
563
564impl ProjectionTargets {
565 pub fn first(&self) -> &(Arc<dyn PhysicalExpr>, usize) {
567 self.exprs_indices.first().unwrap()
569 }
570
571 pub fn push(&mut self, target: (Arc<dyn PhysicalExpr>, usize)) {
573 self.exprs_indices.push(target);
574 }
575}
576
577impl Deref for ProjectionTargets {
578 type Target = [(Arc<dyn PhysicalExpr>, usize)];
579
580 fn deref(&self) -> &Self::Target {
581 &self.exprs_indices
582 }
583}
584
585impl From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets {
586 fn from(exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>) -> Self {
587 Self { exprs_indices }
588 }
589}
590
591#[derive(Clone, Debug)]
594pub struct ProjectionMapping {
595 map: IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>,
598}
599
600impl ProjectionMapping {
601 pub fn try_new(
615 expr: impl IntoIterator<Item = (Arc<dyn PhysicalExpr>, String)>,
616 input_schema: &SchemaRef,
617 ) -> Result<Self> {
618 let mut map = IndexMap::<_, ProjectionTargets>::new();
620 for (expr_idx, (expr, name)) in expr.into_iter().enumerate() {
621 let target_expr = Arc::new(Column::new(&name, expr_idx)) as _;
622 let source_expr = expr.transform_down_with_schema(input_schema, |e, schema| match e.as_any().downcast_ref::<Column>() {
623 Some(col) => {
624 let idx = col.index();
629 let matching_field = schema.field(idx);
630 let matching_name = matching_field.name();
631 if col.name() != matching_name {
632 return internal_err!(
633 "Input field name {} does not match with the projection expression {}",
634 matching_name,
635 col.name()
636 );
637 }
638 let matching_column = Column::new(matching_name, idx);
639 Ok(Transformed::yes(Arc::new(matching_column)))
640 }
641 None => Ok(Transformed::no(e)),
642 })
643 .data()?;
644 map.entry(source_expr)
645 .or_default()
646 .push((target_expr, expr_idx));
647 }
648 Ok(Self { map })
649 }
650
651 pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Result<Self> {
656 let projection_exprs = indices.iter().map(|index| {
657 let field = schema.field(*index);
658 let column = Arc::new(Column::new(field.name(), *index));
659 (column as _, field.name().clone())
660 });
661 ProjectionMapping::try_new(projection_exprs, schema)
662 }
663}
664
665impl Deref for ProjectionMapping {
666 type Target = IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>;
667
668 fn deref(&self) -> &Self::Target {
669 &self.map
670 }
671}
672
673impl FromIterator<(Arc<dyn PhysicalExpr>, ProjectionTargets)> for ProjectionMapping {
674 fn from_iter<T: IntoIterator<Item = (Arc<dyn PhysicalExpr>, ProjectionTargets)>>(
675 iter: T,
676 ) -> Self {
677 Self {
678 map: IndexMap::from_iter(iter),
679 }
680 }
681}
682
683pub fn project_orderings(
696 orderings: &[LexOrdering],
697 schema: &SchemaRef,
698) -> Vec<LexOrdering> {
699 let mut projected_orderings = vec![];
700
701 for ordering in orderings {
702 projected_orderings.extend(project_ordering(ordering, schema));
703 }
704
705 projected_orderings
706}
707
708pub fn project_ordering(
738 ordering: &LexOrdering,
739 schema: &SchemaRef,
740) -> Option<LexOrdering> {
741 let mut projected_exprs = vec![];
742 for PhysicalSortExpr { expr, options } in ordering.iter() {
743 let transformed =
744 Arc::clone(expr).transform_up_with_lambdas_params(|expr, lambdas_params| {
745 let col = match expr.as_any().downcast_ref::<Column>() {
746 Some(col) if !lambdas_params.contains(col.name()) => col,
747 _ => {
748 return Ok(Transformed::no(expr));
749 }
750 };
751
752 let name = col.name();
753 if let Some((idx, _)) = schema.column_with_name(name) {
754 Ok(Transformed::yes(Arc::new(Column::new(name, idx))))
756 } else {
757 plan_err!("")
760 }
761 });
762
763 match transformed {
764 Ok(transformed) => {
765 projected_exprs.push(PhysicalSortExpr::new(transformed.data, *options));
766 }
767 Err(_) => {
768 break;
771 }
772 }
773 }
774
775 LexOrdering::new(projected_exprs)
776}
777
778#[cfg(test)]
779pub(crate) mod tests {
780 use std::collections::HashMap;
781
782 use super::*;
783 use crate::equivalence::{convert_to_orderings, EquivalenceProperties};
784 use crate::expressions::{col, BinaryExpr, Literal};
785 use crate::utils::tests::TestScalarUDF;
786 use crate::{PhysicalExprRef, ScalarFunctionExpr};
787
788 use arrow::compute::SortOptions;
789 use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
790 use datafusion_common::config::ConfigOptions;
791 use datafusion_common::{ScalarValue, Statistics};
792 use datafusion_expr::{Operator, ScalarUDF};
793 use insta::assert_snapshot;
794
795 pub(crate) fn output_schema(
796 mapping: &ProjectionMapping,
797 input_schema: &Arc<Schema>,
798 ) -> Result<SchemaRef> {
799 let mut fields = vec![];
801 for (source, targets) in mapping.iter() {
802 let data_type = source.data_type(input_schema)?;
803 let nullable = source.nullable(input_schema)?;
804 for (target, _) in targets.iter() {
805 let Some(column) = target.as_any().downcast_ref::<Column>() else {
806 return plan_err!("Expects to have column");
807 };
808 fields.push(Field::new(column.name(), data_type.clone(), nullable));
809 }
810 }
811
812 let output_schema = Arc::new(Schema::new_with_metadata(
813 fields,
814 input_schema.metadata().clone(),
815 ));
816
817 Ok(output_schema)
818 }
819
820 #[test]
821 fn project_orderings() -> Result<()> {
822 let schema = Arc::new(Schema::new(vec![
823 Field::new("a", DataType::Int32, true),
824 Field::new("b", DataType::Int32, true),
825 Field::new("c", DataType::Int32, true),
826 Field::new("d", DataType::Int32, true),
827 Field::new("e", DataType::Int32, true),
828 Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
829 ]));
830 let col_a = &col("a", &schema)?;
831 let col_b = &col("b", &schema)?;
832 let col_c = &col("c", &schema)?;
833 let col_d = &col("d", &schema)?;
834 let col_e = &col("e", &schema)?;
835 let col_ts = &col("ts", &schema)?;
836 let a_plus_b = Arc::new(BinaryExpr::new(
837 Arc::clone(col_a),
838 Operator::Plus,
839 Arc::clone(col_b),
840 )) as Arc<dyn PhysicalExpr>;
841 let b_plus_d = Arc::new(BinaryExpr::new(
842 Arc::clone(col_b),
843 Operator::Plus,
844 Arc::clone(col_d),
845 )) as Arc<dyn PhysicalExpr>;
846 let b_plus_e = Arc::new(BinaryExpr::new(
847 Arc::clone(col_b),
848 Operator::Plus,
849 Arc::clone(col_e),
850 )) as Arc<dyn PhysicalExpr>;
851 let c_plus_d = Arc::new(BinaryExpr::new(
852 Arc::clone(col_c),
853 Operator::Plus,
854 Arc::clone(col_d),
855 )) as Arc<dyn PhysicalExpr>;
856
857 let option_asc = SortOptions {
858 descending: false,
859 nulls_first: false,
860 };
861 let option_desc = SortOptions {
862 descending: true,
863 nulls_first: true,
864 };
865
866 let test_cases = vec![
867 (
869 vec![
871 vec![(col_b, option_asc)],
873 ],
874 vec![(col_b, "b_new".to_string()), (col_a, "a_new".to_string())],
876 vec![
878 vec![("b_new", option_asc)],
880 ],
881 ),
882 (
884 vec![
886 ],
888 vec![(col_c, "c_new".to_string()), (col_b, "b_new".to_string())],
890 vec![
892 ],
894 ),
895 (
897 vec![
899 vec![(col_ts, option_asc)],
901 ],
902 vec![
904 (col_b, "b_new".to_string()),
905 (col_a, "a_new".to_string()),
906 (col_ts, "ts_new".to_string()),
907 ],
908 vec![
910 vec![("ts_new", option_asc)],
912 ],
913 ),
914 (
916 vec![
918 vec![(col_a, option_asc), (col_ts, option_asc)],
920 vec![(col_b, option_asc), (col_ts, option_asc)],
922 ],
923 vec![
925 (col_b, "b_new".to_string()),
926 (col_a, "a_new".to_string()),
927 (col_ts, "ts_new".to_string()),
928 ],
929 vec![
931 vec![("a_new", option_asc), ("ts_new", option_asc)],
933 vec![("b_new", option_asc), ("ts_new", option_asc)],
935 ],
936 ),
937 (
939 vec![
941 vec![(&a_plus_b, option_asc)],
943 ],
944 vec![
946 (col_b, "b_new".to_string()),
947 (col_a, "a_new".to_string()),
948 (&a_plus_b, "a+b".to_string()),
949 ],
950 vec![
952 vec![("a+b", option_asc)],
954 ],
955 ),
956 (
958 vec![
960 vec![(&a_plus_b, option_asc), (col_c, option_asc)],
962 ],
963 vec![
965 (col_b, "b_new".to_string()),
966 (col_a, "a_new".to_string()),
967 (col_c, "c_new".to_string()),
968 (&a_plus_b, "a+b".to_string()),
969 ],
970 vec![
972 vec![("a+b", option_asc), ("c_new", option_asc)],
974 ],
975 ),
976 (
978 vec![
979 vec![(col_a, option_asc), (col_b, option_asc)],
981 vec![(col_a, option_asc), (col_d, option_asc)],
983 ],
984 vec![
986 (col_b, "b_new".to_string()),
987 (col_a, "a_new".to_string()),
988 (col_d, "d_new".to_string()),
989 (&b_plus_d, "b+d".to_string()),
990 ],
991 vec![
993 vec![("a_new", option_asc), ("b_new", option_asc)],
995 vec![("a_new", option_asc), ("d_new", option_asc)],
997 vec![("a_new", option_asc), ("b+d", option_asc)],
999 ],
1000 ),
1001 (
1003 vec![
1005 vec![(&b_plus_d, option_asc)],
1007 ],
1008 vec![
1010 (col_b, "b_new".to_string()),
1011 (col_a, "a_new".to_string()),
1012 (col_d, "d_new".to_string()),
1013 (&b_plus_d, "b+d".to_string()),
1014 ],
1015 vec![
1017 vec![("b+d", option_asc)],
1019 ],
1020 ),
1021 (
1023 vec![
1025 vec![
1027 (col_a, option_asc),
1028 (col_d, option_asc),
1029 (col_b, option_asc),
1030 ],
1031 vec![(col_c, option_asc)],
1033 ],
1034 vec![
1036 (col_b, "b_new".to_string()),
1037 (col_a, "a_new".to_string()),
1038 (col_d, "d_new".to_string()),
1039 (col_c, "c_new".to_string()),
1040 ],
1041 vec![
1043 vec![
1045 ("a_new", option_asc),
1046 ("d_new", option_asc),
1047 ("b_new", option_asc),
1048 ],
1049 vec![("c_new", option_asc)],
1051 ],
1052 ),
1053 (
1055 vec![
1056 vec![
1058 (col_a, option_asc),
1059 (col_b, option_asc),
1060 (col_c, option_asc),
1061 ],
1062 vec![(col_a, option_asc), (col_d, option_asc)],
1064 ],
1065 vec![
1067 (col_b, "b_new".to_string()),
1068 (col_a, "a_new".to_string()),
1069 (col_c, "c_new".to_string()),
1070 (&c_plus_d, "c+d".to_string()),
1071 ],
1072 vec![
1074 vec![
1076 ("a_new", option_asc),
1077 ("b_new", option_asc),
1078 ("c_new", option_asc),
1079 ],
1080 vec![
1082 ("a_new", option_asc),
1083 ("b_new", option_asc),
1084 ("c+d", option_asc),
1085 ],
1086 ],
1087 ),
1088 (
1090 vec![
1092 vec![(col_a, option_asc), (col_b, option_asc)],
1094 vec![(col_a, option_asc), (col_d, option_asc)],
1096 ],
1097 vec![
1099 (col_b, "b_new".to_string()),
1100 (col_a, "a_new".to_string()),
1101 (&b_plus_d, "b+d".to_string()),
1102 ],
1103 vec![
1105 vec![("a_new", option_asc), ("b_new", option_asc)],
1107 vec![("a_new", option_asc), ("b+d", option_asc)],
1109 ],
1110 ),
1111 (
1113 vec![
1115 vec![
1117 (col_a, option_asc),
1118 (col_b, option_asc),
1119 (col_c, option_asc),
1120 ],
1121 ],
1122 vec![(col_c, "c_new".to_string()), (col_a, "a_new".to_string())],
1124 vec![
1126 vec![("a_new", option_asc)],
1128 ],
1129 ),
1130 (
1132 vec![
1134 vec![
1136 (col_a, option_asc),
1137 (col_b, option_asc),
1138 (col_c, option_asc),
1139 ],
1140 vec![
1142 (col_a, option_asc),
1143 (&a_plus_b, option_asc),
1144 (col_c, option_asc),
1145 ],
1146 ],
1147 vec![
1149 (col_c, "c_new".to_string()),
1150 (col_b, "b_new".to_string()),
1151 (col_a, "a_new".to_string()),
1152 (&a_plus_b, "a+b".to_string()),
1153 ],
1154 vec![
1156 vec![
1158 ("a_new", option_asc),
1159 ("b_new", option_asc),
1160 ("c_new", option_asc),
1161 ],
1162 vec![
1164 ("a_new", option_asc),
1165 ("a+b", option_asc),
1166 ("c_new", option_asc),
1167 ],
1168 ],
1169 ),
1170 (
1172 vec![
1174 vec![(col_a, option_asc), (col_b, option_asc)],
1176 vec![(col_c, option_asc), (col_b, option_asc)],
1178 vec![(col_d, option_asc), (col_e, option_asc)],
1180 ],
1181 vec![
1183 (col_c, "c_new".to_string()),
1184 (col_d, "d_new".to_string()),
1185 (col_a, "a_new".to_string()),
1186 (&b_plus_e, "b+e".to_string()),
1187 ],
1188 vec![
1190 vec![
1192 ("a_new", option_asc),
1193 ("d_new", option_asc),
1194 ("b+e", option_asc),
1195 ],
1196 vec![
1198 ("d_new", option_asc),
1199 ("a_new", option_asc),
1200 ("b+e", option_asc),
1201 ],
1202 vec![
1204 ("c_new", option_asc),
1205 ("d_new", option_asc),
1206 ("b+e", option_asc),
1207 ],
1208 vec![
1210 ("d_new", option_asc),
1211 ("c_new", option_asc),
1212 ("b+e", option_asc),
1213 ],
1214 ],
1215 ),
1216 (
1218 vec![
1220 vec![
1222 (col_a, option_asc),
1223 (col_c, option_asc),
1224 (col_b, option_asc),
1225 ],
1226 ],
1227 vec![
1229 (col_c, "c_new".to_string()),
1230 (col_a, "a_new".to_string()),
1231 (&a_plus_b, "a+b".to_string()),
1232 ],
1233 vec![
1235 vec![
1237 ("a_new", option_asc),
1238 ("c_new", option_asc),
1239 ("a+b", option_asc),
1240 ],
1241 ],
1242 ),
1243 (
1245 vec![
1247 vec![(col_a, option_asc), (col_b, option_asc)],
1249 vec![(col_c, option_asc), (col_b, option_desc)],
1251 vec![(col_e, option_asc)],
1253 ],
1254 vec![
1256 (col_c, "c_new".to_string()),
1257 (col_a, "a_new".to_string()),
1258 (col_b, "b_new".to_string()),
1259 (&b_plus_e, "b+e".to_string()),
1260 ],
1261 vec![
1263 vec![("a_new", option_asc), ("b_new", option_asc)],
1265 vec![("a_new", option_asc), ("b+e", option_asc)],
1267 vec![("c_new", option_asc), ("b_new", option_desc)],
1269 ],
1270 ),
1271 ];
1272
1273 for (idx, (orderings, proj_exprs, expected)) in test_cases.into_iter().enumerate()
1274 {
1275 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1276
1277 let orderings = convert_to_orderings(&orderings);
1278 eq_properties.add_orderings(orderings);
1279
1280 let proj_exprs = proj_exprs
1281 .into_iter()
1282 .map(|(expr, name)| (Arc::clone(expr), name));
1283 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1284 let output_schema = output_schema(&projection_mapping, &schema)?;
1285
1286 let expected = expected
1287 .into_iter()
1288 .map(|ordering| {
1289 ordering
1290 .into_iter()
1291 .map(|(name, options)| {
1292 (col(name, &output_schema).unwrap(), options)
1293 })
1294 .collect::<Vec<_>>()
1295 })
1296 .collect::<Vec<_>>();
1297 let expected = convert_to_orderings(&expected);
1298
1299 let projected_eq = eq_properties.project(&projection_mapping, output_schema);
1300 let orderings = projected_eq.oeq_class();
1301
1302 let err_msg = format!(
1303 "test_idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1304 );
1305
1306 assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1307 for expected_ordering in &expected {
1308 assert!(orderings.contains(expected_ordering), "{}", err_msg)
1309 }
1310 }
1311
1312 Ok(())
1313 }
1314
1315 #[test]
1316 fn project_orderings2() -> Result<()> {
1317 let schema = Arc::new(Schema::new(vec![
1318 Field::new("a", DataType::Int32, true),
1319 Field::new("b", DataType::Int32, true),
1320 Field::new("c", DataType::Int32, true),
1321 Field::new("d", DataType::Int32, true),
1322 Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1323 ]));
1324 let col_a = &col("a", &schema)?;
1325 let col_b = &col("b", &schema)?;
1326 let col_c = &col("c", &schema)?;
1327 let col_ts = &col("ts", &schema)?;
1328 let a_plus_b = Arc::new(BinaryExpr::new(
1329 Arc::clone(col_a),
1330 Operator::Plus,
1331 Arc::clone(col_b),
1332 )) as Arc<dyn PhysicalExpr>;
1333
1334 let test_fun = Arc::new(ScalarUDF::new_from_impl(TestScalarUDF::new()));
1335
1336 let round_c = Arc::new(ScalarFunctionExpr::try_new(
1337 test_fun,
1338 vec![Arc::clone(col_c)],
1339 &schema,
1340 Arc::new(ConfigOptions::default()),
1341 )?) as PhysicalExprRef;
1342
1343 let option_asc = SortOptions {
1344 descending: false,
1345 nulls_first: false,
1346 };
1347
1348 let proj_exprs = vec![
1349 (col_b, "b_new".to_string()),
1350 (col_a, "a_new".to_string()),
1351 (col_c, "c_new".to_string()),
1352 (&round_c, "round_c_res".to_string()),
1353 ];
1354 let proj_exprs = proj_exprs
1355 .into_iter()
1356 .map(|(expr, name)| (Arc::clone(expr), name));
1357 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1358 let output_schema = output_schema(&projection_mapping, &schema)?;
1359
1360 let col_a_new = &col("a_new", &output_schema)?;
1361 let col_b_new = &col("b_new", &output_schema)?;
1362 let col_c_new = &col("c_new", &output_schema)?;
1363 let col_round_c_res = &col("round_c_res", &output_schema)?;
1364 let a_new_plus_b_new = Arc::new(BinaryExpr::new(
1365 Arc::clone(col_a_new),
1366 Operator::Plus,
1367 Arc::clone(col_b_new),
1368 )) as Arc<dyn PhysicalExpr>;
1369
1370 let test_cases = [
1371 (
1373 vec![
1375 vec![(col_a, option_asc)],
1377 ],
1378 vec![
1380 vec![(col_a_new, option_asc)],
1382 ],
1383 ),
1384 (
1386 vec![
1388 vec![(&a_plus_b, option_asc)],
1390 ],
1391 vec![
1393 vec![(&a_new_plus_b_new, option_asc)],
1395 ],
1396 ),
1397 (
1399 vec![
1401 vec![(col_a, option_asc), (col_ts, option_asc)],
1403 ],
1404 vec![
1406 vec![(col_a_new, option_asc)],
1408 ],
1409 ),
1410 (
1412 vec![
1414 vec![
1416 (col_a, option_asc),
1417 (col_ts, option_asc),
1418 (col_b, option_asc),
1419 ],
1420 ],
1421 vec![
1423 vec![(col_a_new, option_asc)],
1425 ],
1426 ),
1427 (
1429 vec![
1431 vec![(col_a, option_asc), (col_c, option_asc)],
1433 ],
1434 vec![
1436 vec![(col_a_new, option_asc), (col_round_c_res, option_asc)],
1438 vec![(col_a_new, option_asc), (col_c_new, option_asc)],
1440 ],
1441 ),
1442 (
1444 vec![
1446 vec![(col_c, option_asc), (col_b, option_asc)],
1448 ],
1449 vec![
1451 vec![(col_round_c_res, option_asc)],
1453 vec![(col_c_new, option_asc), (col_b_new, option_asc)],
1455 ],
1456 ),
1457 (
1459 vec![
1461 vec![(&a_plus_b, option_asc), (col_c, option_asc)],
1463 ],
1464 vec![
1466 vec![
1468 (&a_new_plus_b_new, option_asc),
1469 (col_round_c_res, option_asc),
1470 ],
1471 vec![(&a_new_plus_b_new, option_asc), (col_c_new, option_asc)],
1473 ],
1474 ),
1475 ];
1476
1477 for (idx, (orderings, expected)) in test_cases.iter().enumerate() {
1478 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1479
1480 let orderings = convert_to_orderings(orderings);
1481 eq_properties.add_orderings(orderings);
1482
1483 let expected = convert_to_orderings(expected);
1484
1485 let projected_eq =
1486 eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
1487 let orderings = projected_eq.oeq_class();
1488
1489 let err_msg = format!(
1490 "test idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1491 );
1492
1493 assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1494 for expected_ordering in &expected {
1495 assert!(orderings.contains(expected_ordering), "{}", err_msg)
1496 }
1497 }
1498 Ok(())
1499 }
1500
1501 #[test]
1502 fn project_orderings3() -> Result<()> {
1503 let schema = Arc::new(Schema::new(vec![
1504 Field::new("a", DataType::Int32, true),
1505 Field::new("b", DataType::Int32, true),
1506 Field::new("c", DataType::Int32, true),
1507 Field::new("d", DataType::Int32, true),
1508 Field::new("e", DataType::Int32, true),
1509 Field::new("f", DataType::Int32, true),
1510 ]));
1511 let col_a = &col("a", &schema)?;
1512 let col_b = &col("b", &schema)?;
1513 let col_c = &col("c", &schema)?;
1514 let col_d = &col("d", &schema)?;
1515 let col_e = &col("e", &schema)?;
1516 let col_f = &col("f", &schema)?;
1517 let a_plus_b = Arc::new(BinaryExpr::new(
1518 Arc::clone(col_a),
1519 Operator::Plus,
1520 Arc::clone(col_b),
1521 )) as Arc<dyn PhysicalExpr>;
1522
1523 let option_asc = SortOptions {
1524 descending: false,
1525 nulls_first: false,
1526 };
1527
1528 let proj_exprs = vec![
1529 (col_c, "c_new".to_string()),
1530 (col_d, "d_new".to_string()),
1531 (&a_plus_b, "a+b".to_string()),
1532 ];
1533 let proj_exprs = proj_exprs
1534 .into_iter()
1535 .map(|(expr, name)| (Arc::clone(expr), name));
1536 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1537 let output_schema = output_schema(&projection_mapping, &schema)?;
1538
1539 let col_a_plus_b_new = &col("a+b", &output_schema)?;
1540 let col_c_new = &col("c_new", &output_schema)?;
1541 let col_d_new = &col("d_new", &output_schema)?;
1542
1543 let test_cases = vec![
1544 (
1546 vec![
1548 vec![(col_d, option_asc), (col_b, option_asc)],
1550 vec![(col_c, option_asc), (col_a, option_asc)],
1552 ],
1553 vec![],
1555 vec![
1557 vec![
1559 (col_d_new, option_asc),
1560 (col_c_new, option_asc),
1561 (col_a_plus_b_new, option_asc),
1562 ],
1563 vec![
1565 (col_c_new, option_asc),
1566 (col_d_new, option_asc),
1567 (col_a_plus_b_new, option_asc),
1568 ],
1569 ],
1570 ),
1571 (
1573 vec![
1575 vec![(col_d, option_asc), (col_b, option_asc)],
1577 vec![(col_c, option_asc), (col_e, option_asc)],
1579 ],
1580 vec![(col_e, col_a)],
1582 vec![
1584 vec![
1586 (col_d_new, option_asc),
1587 (col_c_new, option_asc),
1588 (col_a_plus_b_new, option_asc),
1589 ],
1590 vec![
1592 (col_c_new, option_asc),
1593 (col_d_new, option_asc),
1594 (col_a_plus_b_new, option_asc),
1595 ],
1596 ],
1597 ),
1598 (
1600 vec![
1602 vec![(col_d, option_asc), (col_b, option_asc)],
1604 vec![(col_c, option_asc), (col_e, option_asc)],
1606 ],
1607 vec![(col_a, col_f)],
1609 vec![
1611 vec![(col_d_new, option_asc)],
1613 vec![(col_c_new, option_asc)],
1615 ],
1616 ),
1617 ];
1618 for (orderings, equal_columns, expected) in test_cases {
1619 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1620 for (lhs, rhs) in equal_columns {
1621 eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?;
1622 }
1623
1624 let orderings = convert_to_orderings(&orderings);
1625 eq_properties.add_orderings(orderings);
1626
1627 let expected = convert_to_orderings(&expected);
1628
1629 let projected_eq =
1630 eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
1631 let orderings = projected_eq.oeq_class();
1632
1633 let err_msg = format!(
1634 "actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1635 );
1636
1637 assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1638 for expected_ordering in &expected {
1639 assert!(orderings.contains(expected_ordering), "{}", err_msg)
1640 }
1641 }
1642
1643 Ok(())
1644 }
1645
1646 fn get_stats() -> Statistics {
1647 Statistics {
1648 num_rows: Precision::Exact(5),
1649 total_byte_size: Precision::Exact(23),
1650 column_statistics: vec![
1651 ColumnStatistics {
1652 distinct_count: Precision::Exact(5),
1653 max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
1654 min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
1655 sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1656 null_count: Precision::Exact(0),
1657 },
1658 ColumnStatistics {
1659 distinct_count: Precision::Exact(1),
1660 max_value: Precision::Exact(ScalarValue::from("x")),
1661 min_value: Precision::Exact(ScalarValue::from("a")),
1662 sum_value: Precision::Absent,
1663 null_count: Precision::Exact(3),
1664 },
1665 ColumnStatistics {
1666 distinct_count: Precision::Absent,
1667 max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
1668 min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
1669 sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
1670 null_count: Precision::Absent,
1671 },
1672 ],
1673 }
1674 }
1675
1676 fn get_schema() -> Schema {
1677 let field_0 = Field::new("col0", DataType::Int64, false);
1678 let field_1 = Field::new("col1", DataType::Utf8, false);
1679 let field_2 = Field::new("col2", DataType::Float32, false);
1680 Schema::new(vec![field_0, field_1, field_2])
1681 }
1682
1683 #[test]
1684 fn test_stats_projection_columns_only() {
1685 let source = get_stats();
1686 let schema = get_schema();
1687
1688 let projection = ProjectionExprs::new(vec![
1689 ProjectionExpr {
1690 expr: Arc::new(Column::new("col1", 1)),
1691 alias: "col1".to_string(),
1692 },
1693 ProjectionExpr {
1694 expr: Arc::new(Column::new("col0", 0)),
1695 alias: "col0".to_string(),
1696 },
1697 ]);
1698
1699 let result = projection.project_statistics(source, &schema).unwrap();
1700
1701 let expected = Statistics {
1702 num_rows: Precision::Exact(5),
1703 total_byte_size: Precision::Exact(23),
1704 column_statistics: vec![
1705 ColumnStatistics {
1706 distinct_count: Precision::Exact(1),
1707 max_value: Precision::Exact(ScalarValue::from("x")),
1708 min_value: Precision::Exact(ScalarValue::from("a")),
1709 sum_value: Precision::Absent,
1710 null_count: Precision::Exact(3),
1711 },
1712 ColumnStatistics {
1713 distinct_count: Precision::Exact(5),
1714 max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
1715 min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
1716 sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1717 null_count: Precision::Exact(0),
1718 },
1719 ],
1720 };
1721
1722 assert_eq!(result, expected);
1723 }
1724
1725 #[test]
1726 fn test_stats_projection_column_with_primitive_width_only() {
1727 let source = get_stats();
1728 let schema = get_schema();
1729
1730 let projection = ProjectionExprs::new(vec![
1731 ProjectionExpr {
1732 expr: Arc::new(Column::new("col2", 2)),
1733 alias: "col2".to_string(),
1734 },
1735 ProjectionExpr {
1736 expr: Arc::new(Column::new("col0", 0)),
1737 alias: "col0".to_string(),
1738 },
1739 ]);
1740
1741 let result = projection.project_statistics(source, &schema).unwrap();
1742
1743 let expected = Statistics {
1744 num_rows: Precision::Exact(5),
1745 total_byte_size: Precision::Exact(60),
1746 column_statistics: vec![
1747 ColumnStatistics {
1748 distinct_count: Precision::Absent,
1749 max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
1750 min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
1751 sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
1752 null_count: Precision::Absent,
1753 },
1754 ColumnStatistics {
1755 distinct_count: Precision::Exact(5),
1756 max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
1757 min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
1758 sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1759 null_count: Precision::Exact(0),
1760 },
1761 ],
1762 };
1763
1764 assert_eq!(result, expected);
1765 }
1766
1767 #[test]
1770 fn test_projection_new() -> Result<()> {
1771 let exprs = vec![
1772 ProjectionExpr {
1773 expr: Arc::new(Column::new("a", 0)),
1774 alias: "a".to_string(),
1775 },
1776 ProjectionExpr {
1777 expr: Arc::new(Column::new("b", 1)),
1778 alias: "b".to_string(),
1779 },
1780 ];
1781 let projection = ProjectionExprs::new(exprs.clone());
1782 assert_eq!(projection.as_ref().len(), 2);
1783 Ok(())
1784 }
1785
1786 #[test]
1787 fn test_projection_from_vec() -> Result<()> {
1788 let exprs = vec![ProjectionExpr {
1789 expr: Arc::new(Column::new("x", 0)),
1790 alias: "x".to_string(),
1791 }];
1792 let projection: ProjectionExprs = exprs.clone().into();
1793 assert_eq!(projection.as_ref().len(), 1);
1794 Ok(())
1795 }
1796
1797 #[test]
1798 fn test_projection_as_ref() -> Result<()> {
1799 let exprs = vec![
1800 ProjectionExpr {
1801 expr: Arc::new(Column::new("col1", 0)),
1802 alias: "col1".to_string(),
1803 },
1804 ProjectionExpr {
1805 expr: Arc::new(Column::new("col2", 1)),
1806 alias: "col2".to_string(),
1807 },
1808 ];
1809 let projection = ProjectionExprs::new(exprs);
1810 let as_ref: &[ProjectionExpr] = projection.as_ref();
1811 assert_eq!(as_ref.len(), 2);
1812 Ok(())
1813 }
1814
1815 #[test]
1816 fn test_column_indices_multiple_columns() -> Result<()> {
1817 let projection = ProjectionExprs::new(vec![
1819 ProjectionExpr {
1820 expr: Arc::new(Column::new("c", 5)),
1821 alias: "c".to_string(),
1822 },
1823 ProjectionExpr {
1824 expr: Arc::new(Column::new("b", 2)),
1825 alias: "b".to_string(),
1826 },
1827 ProjectionExpr {
1828 expr: Arc::new(Column::new("a", 0)),
1829 alias: "a".to_string(),
1830 },
1831 ]);
1832 assert_eq!(projection.column_indices(), vec![0, 2, 5]);
1834 Ok(())
1835 }
1836
1837 #[test]
1838 fn test_column_indices_duplicates() -> Result<()> {
1839 let projection = ProjectionExprs::new(vec![
1841 ProjectionExpr {
1842 expr: Arc::new(Column::new("a", 1)),
1843 alias: "a".to_string(),
1844 },
1845 ProjectionExpr {
1846 expr: Arc::new(Column::new("b", 3)),
1847 alias: "b".to_string(),
1848 },
1849 ProjectionExpr {
1850 expr: Arc::new(Column::new("a2", 1)), alias: "a2".to_string(),
1852 },
1853 ]);
1854 assert_eq!(projection.column_indices(), vec![1, 3]);
1855 Ok(())
1856 }
1857
1858 #[test]
1859 fn test_column_indices_unsorted() -> Result<()> {
1860 let projection = ProjectionExprs::new(vec![
1862 ProjectionExpr {
1863 expr: Arc::new(Column::new("c", 5)),
1864 alias: "c".to_string(),
1865 },
1866 ProjectionExpr {
1867 expr: Arc::new(Column::new("a", 1)),
1868 alias: "a".to_string(),
1869 },
1870 ProjectionExpr {
1871 expr: Arc::new(Column::new("b", 3)),
1872 alias: "b".to_string(),
1873 },
1874 ]);
1875 assert_eq!(projection.column_indices(), vec![1, 3, 5]);
1876 Ok(())
1877 }
1878
1879 #[test]
1880 fn test_column_indices_complex_expr() -> Result<()> {
1881 let expr = Arc::new(BinaryExpr::new(
1883 Arc::new(Column::new("a", 1)),
1884 Operator::Plus,
1885 Arc::new(Column::new("b", 4)),
1886 ));
1887 let projection = ProjectionExprs::new(vec![
1888 ProjectionExpr {
1889 expr,
1890 alias: "sum".to_string(),
1891 },
1892 ProjectionExpr {
1893 expr: Arc::new(Column::new("c", 2)),
1894 alias: "c".to_string(),
1895 },
1896 ]);
1897 assert_eq!(projection.column_indices(), vec![1, 2, 4]);
1899 Ok(())
1900 }
1901
1902 #[test]
1903 fn test_column_indices_empty() -> Result<()> {
1904 let projection = ProjectionExprs::new(vec![]);
1905 assert_eq!(projection.column_indices(), Vec::<usize>::new());
1906 Ok(())
1907 }
1908
1909 #[test]
1910 fn test_merge_simple_columns() -> Result<()> {
1911 let base_projection = ProjectionExprs::new(vec![
1913 ProjectionExpr {
1914 expr: Arc::new(Column::new("c", 2)),
1915 alias: "x".to_string(),
1916 },
1917 ProjectionExpr {
1918 expr: Arc::new(Column::new("b", 1)),
1919 alias: "y".to_string(),
1920 },
1921 ProjectionExpr {
1922 expr: Arc::new(Column::new("a", 0)),
1923 alias: "z".to_string(),
1924 },
1925 ]);
1926
1927 let top_projection = ProjectionExprs::new(vec![
1929 ProjectionExpr {
1930 expr: Arc::new(Column::new("y", 1)),
1931 alias: "col2".to_string(),
1932 },
1933 ProjectionExpr {
1934 expr: Arc::new(Column::new("x", 0)),
1935 alias: "col1".to_string(),
1936 },
1937 ]);
1938
1939 let merged = base_projection.try_merge(&top_projection)?;
1941 assert_snapshot!(format!("{merged}"), @"Projection[b@1 AS col2, c@2 AS col1]");
1942
1943 Ok(())
1944 }
1945
1946 #[test]
1947 fn test_merge_with_expressions() -> Result<()> {
1948 let base_projection = ProjectionExprs::new(vec![
1950 ProjectionExpr {
1951 expr: Arc::new(Column::new("c", 2)),
1952 alias: "x".to_string(),
1953 },
1954 ProjectionExpr {
1955 expr: Arc::new(Column::new("b", 1)),
1956 alias: "y".to_string(),
1957 },
1958 ProjectionExpr {
1959 expr: Arc::new(Column::new("a", 0)),
1960 alias: "z".to_string(),
1961 },
1962 ]);
1963
1964 let top_projection = ProjectionExprs::new(vec![
1966 ProjectionExpr {
1967 expr: Arc::new(BinaryExpr::new(
1968 Arc::new(Column::new("y", 1)),
1969 Operator::Plus,
1970 Arc::new(Column::new("z", 2)),
1971 )),
1972 alias: "c2".to_string(),
1973 },
1974 ProjectionExpr {
1975 expr: Arc::new(BinaryExpr::new(
1976 Arc::new(Column::new("x", 0)),
1977 Operator::Plus,
1978 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1979 )),
1980 alias: "c1".to_string(),
1981 },
1982 ]);
1983
1984 let merged = base_projection.try_merge(&top_projection)?;
1986 assert_snapshot!(format!("{merged}"), @"Projection[b@1 + a@0 AS c2, c@2 + 1 AS c1]");
1987
1988 Ok(())
1989 }
1990
1991 #[test]
1992 fn try_merge_error() {
1993 let base = ProjectionExprs::new(vec![
1995 ProjectionExpr {
1996 expr: Arc::new(Column::new("a", 0)),
1997 alias: "x".to_string(),
1998 },
1999 ProjectionExpr {
2000 expr: Arc::new(Column::new("b", 1)),
2001 alias: "y".to_string(),
2002 },
2003 ]);
2004
2005 let top = ProjectionExprs::new(vec![ProjectionExpr {
2007 expr: Arc::new(Column::new("z", 5)), alias: "result".to_string(),
2009 }]);
2010
2011 let err_msg = base.try_merge(&top).unwrap_err().to_string();
2013 assert!(
2014 err_msg.contains("Internal error: Column index 5 out of bounds for projected expressions of length 2"),
2015 "Unexpected error message: {err_msg}",
2016 );
2017 }
2018
2019 #[test]
2020 fn test_project_schema_simple_columns() -> Result<()> {
2021 let input_schema = get_schema();
2023
2024 let projection = ProjectionExprs::new(vec![
2026 ProjectionExpr {
2027 expr: Arc::new(Column::new("col2", 2)),
2028 alias: "c".to_string(),
2029 },
2030 ProjectionExpr {
2031 expr: Arc::new(Column::new("col0", 0)),
2032 alias: "a".to_string(),
2033 },
2034 ]);
2035
2036 let output_schema = projection.project_schema(&input_schema)?;
2037
2038 assert_eq!(output_schema.fields().len(), 2);
2040
2041 assert_eq!(output_schema.field(0).name(), "c");
2043 assert_eq!(output_schema.field(0).data_type(), &DataType::Float32);
2044
2045 assert_eq!(output_schema.field(1).name(), "a");
2047 assert_eq!(output_schema.field(1).data_type(), &DataType::Int64);
2048
2049 Ok(())
2050 }
2051
2052 #[test]
2053 fn test_project_schema_with_expressions() -> Result<()> {
2054 let input_schema = get_schema();
2056
2057 let projection = ProjectionExprs::new(vec![ProjectionExpr {
2059 expr: Arc::new(BinaryExpr::new(
2060 Arc::new(Column::new("col0", 0)),
2061 Operator::Plus,
2062 Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2063 )),
2064 alias: "incremented".to_string(),
2065 }]);
2066
2067 let output_schema = projection.project_schema(&input_schema)?;
2068
2069 assert_eq!(output_schema.fields().len(), 1);
2071
2072 assert_eq!(output_schema.field(0).name(), "incremented");
2074 assert_eq!(output_schema.field(0).data_type(), &DataType::Int64);
2075
2076 Ok(())
2077 }
2078
2079 #[test]
2080 fn test_project_schema_preserves_metadata() -> Result<()> {
2081 let mut metadata = HashMap::new();
2083 metadata.insert("key".to_string(), "value".to_string());
2084 let field_with_metadata =
2085 Field::new("col0", DataType::Int64, false).with_metadata(metadata.clone());
2086 let input_schema = Schema::new(vec![
2087 field_with_metadata,
2088 Field::new("col1", DataType::Utf8, false),
2089 ]);
2090
2091 let projection = ProjectionExprs::new(vec![ProjectionExpr {
2093 expr: Arc::new(Column::new("col0", 0)),
2094 alias: "renamed".to_string(),
2095 }]);
2096
2097 let output_schema = projection.project_schema(&input_schema)?;
2098
2099 assert_eq!(output_schema.fields().len(), 1);
2101
2102 assert_eq!(output_schema.field(0).name(), "renamed");
2104 assert_eq!(output_schema.field(0).metadata(), &metadata);
2105
2106 Ok(())
2107 }
2108
2109 #[test]
2110 fn test_project_schema_empty() -> Result<()> {
2111 let input_schema = get_schema();
2112 let projection = ProjectionExprs::new(vec![]);
2113
2114 let output_schema = projection.project_schema(&input_schema)?;
2115
2116 assert_eq!(output_schema.fields().len(), 0);
2117
2118 Ok(())
2119 }
2120
2121 #[test]
2122 fn test_project_statistics_columns_only() -> Result<()> {
2123 let input_stats = get_stats();
2124 let input_schema = get_schema();
2125
2126 let projection = ProjectionExprs::new(vec![
2128 ProjectionExpr {
2129 expr: Arc::new(Column::new("col1", 1)),
2130 alias: "text".to_string(),
2131 },
2132 ProjectionExpr {
2133 expr: Arc::new(Column::new("col0", 0)),
2134 alias: "num".to_string(),
2135 },
2136 ]);
2137
2138 let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2139
2140 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2142
2143 assert_eq!(output_stats.column_statistics.len(), 2);
2145
2146 assert_eq!(
2148 output_stats.column_statistics[0].distinct_count,
2149 Precision::Exact(1)
2150 );
2151 assert_eq!(
2152 output_stats.column_statistics[0].max_value,
2153 Precision::Exact(ScalarValue::from("x"))
2154 );
2155
2156 assert_eq!(
2158 output_stats.column_statistics[1].distinct_count,
2159 Precision::Exact(5)
2160 );
2161 assert_eq!(
2162 output_stats.column_statistics[1].max_value,
2163 Precision::Exact(ScalarValue::Int64(Some(21)))
2164 );
2165
2166 Ok(())
2167 }
2168
2169 #[test]
2170 fn test_project_statistics_with_expressions() -> Result<()> {
2171 let input_stats = get_stats();
2172 let input_schema = get_schema();
2173
2174 let projection = ProjectionExprs::new(vec![
2176 ProjectionExpr {
2177 expr: Arc::new(BinaryExpr::new(
2178 Arc::new(Column::new("col0", 0)),
2179 Operator::Plus,
2180 Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2181 )),
2182 alias: "incremented".to_string(),
2183 },
2184 ProjectionExpr {
2185 expr: Arc::new(Column::new("col1", 1)),
2186 alias: "text".to_string(),
2187 },
2188 ]);
2189
2190 let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2191
2192 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2194
2195 assert_eq!(output_stats.column_statistics.len(), 2);
2197
2198 assert_eq!(
2200 output_stats.column_statistics[0].distinct_count,
2201 Precision::Absent
2202 );
2203 assert_eq!(
2204 output_stats.column_statistics[0].max_value,
2205 Precision::Absent
2206 );
2207
2208 assert_eq!(
2210 output_stats.column_statistics[1].distinct_count,
2211 Precision::Exact(1)
2212 );
2213
2214 Ok(())
2215 }
2216
2217 #[test]
2218 fn test_project_statistics_primitive_width_only() -> Result<()> {
2219 let input_stats = get_stats();
2220 let input_schema = get_schema();
2221
2222 let projection = ProjectionExprs::new(vec![
2224 ProjectionExpr {
2225 expr: Arc::new(Column::new("col2", 2)),
2226 alias: "f".to_string(),
2227 },
2228 ProjectionExpr {
2229 expr: Arc::new(Column::new("col0", 0)),
2230 alias: "i".to_string(),
2231 },
2232 ]);
2233
2234 let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2235
2236 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2238
2239 assert_eq!(output_stats.total_byte_size, Precision::Exact(60));
2242
2243 assert_eq!(output_stats.column_statistics.len(), 2);
2245
2246 Ok(())
2247 }
2248
2249 #[test]
2250 fn test_project_statistics_empty() -> Result<()> {
2251 let input_stats = get_stats();
2252 let input_schema = get_schema();
2253
2254 let projection = ProjectionExprs::new(vec![]);
2255
2256 let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2257
2258 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2260
2261 assert_eq!(output_stats.column_statistics.len(), 0);
2263
2264 assert_eq!(output_stats.total_byte_size, Precision::Exact(0));
2266
2267 Ok(())
2268 }
2269}