1use std::collections::HashSet;
23use std::sync::Arc;
24
25use arrow::array::AsArray;
26use arrow::{
27 array::{new_null_array, ArrayRef, BooleanArray},
28 datatypes::{DataType, Field, Schema, SchemaRef},
29 record_batch::{RecordBatch, RecordBatchOptions},
30};
31pub use datafusion_common::pruning::PruningStatistics;
33use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
34use datafusion_physical_plan::metrics::Count;
35use log::{debug, trace};
36
37use datafusion_common::error::Result;
38use datafusion_common::tree_node::TransformedResult;
39use datafusion_common::{
40 internal_datafusion_err, internal_err, plan_datafusion_err, plan_err,
41 tree_node::Transformed, ScalarValue,
42};
43use datafusion_common::{Column, DFSchema};
44use datafusion_expr_common::operator::Operator;
45use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee};
46use datafusion_physical_expr::{
47 expressions as phys_expr, PhysicalExprExt, PhysicalExprRef,
48};
49use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
50use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
51
52#[derive(Debug, Clone)]
364pub struct PruningPredicate {
365 schema: SchemaRef,
367 predicate_expr: Arc<dyn PhysicalExpr>,
370 required_columns: RequiredColumns,
372 orig_expr: Arc<dyn PhysicalExpr>,
375 literal_guarantees: Vec<LiteralGuarantee>,
380}
381
382pub fn build_pruning_predicate(
388 predicate: Arc<dyn PhysicalExpr>,
389 file_schema: &SchemaRef,
390 predicate_creation_errors: &Count,
391) -> Option<Arc<PruningPredicate>> {
392 match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
393 Ok(pruning_predicate) => {
394 if !pruning_predicate.always_true() {
395 return Some(Arc::new(pruning_predicate));
396 }
397 }
398 Err(e) => {
399 debug!("Could not create pruning predicate for: {e}");
400 predicate_creation_errors.add(1);
401 }
402 }
403 None
404}
405
406pub trait UnhandledPredicateHook {
410 fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
413}
414
415#[derive(Debug, Clone)]
418struct ConstantUnhandledPredicateHook {
419 default: Arc<dyn PhysicalExpr>,
420}
421
422impl Default for ConstantUnhandledPredicateHook {
423 fn default() -> Self {
424 Self {
425 default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
426 }
427 }
428}
429
430impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
431 fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
432 Arc::clone(&self.default)
433 }
434}
435
436impl PruningPredicate {
437 pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
460 let expr = snapshot_physical_expr(expr)?;
463 let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
464
465 let mut required_columns = RequiredColumns::new();
467 let predicate_expr = build_predicate_expression(
468 &expr,
469 &schema,
470 &mut required_columns,
471 &unhandled_hook,
472 );
473 let predicate_schema = required_columns.schema();
474 let predicate_expr =
476 PhysicalExprSimplifier::new(&predicate_schema).simplify(predicate_expr)?;
477
478 let literal_guarantees = LiteralGuarantee::analyze(&expr);
479
480 Ok(Self {
481 schema,
482 predicate_expr,
483 required_columns,
484 orig_expr: expr,
485 literal_guarantees,
486 })
487 }
488
489 pub fn prune<S: PruningStatistics + ?Sized>(
504 &self,
505 statistics: &S,
506 ) -> Result<Vec<bool>> {
507 let mut builder = BoolVecBuilder::new(statistics.num_containers());
508
509 for literal_guarantee in &self.literal_guarantees {
512 let LiteralGuarantee {
513 column,
514 guarantee,
515 literals,
516 } = literal_guarantee;
517 if let Some(results) = statistics.contained(column, literals) {
518 match guarantee {
519 Guarantee::In => builder.combine_array(&results),
524 Guarantee::NotIn => {
530 builder.combine_array(&arrow::compute::not(&results)?)
531 }
532 }
533 if builder.check_all_pruned() {
536 return Ok(builder.build());
537 }
538 }
539 }
540
541 let statistics_batch =
547 build_statistics_record_batch(statistics, &self.required_columns)?;
548
549 builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
551
552 Ok(builder.build())
553 }
554
555 pub fn schema(&self) -> &SchemaRef {
557 &self.schema
558 }
559
560 pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
562 &self.orig_expr
563 }
564
565 pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
567 &self.predicate_expr
568 }
569
570 pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
576 &self.literal_guarantees
577 }
578
579 pub fn always_true(&self) -> bool {
586 is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
587 }
588
589 #[allow(dead_code)]
591 pub fn required_columns(&self) -> &RequiredColumns {
592 &self.required_columns
593 }
594
595 pub fn literal_columns(&self) -> Vec<String> {
603 let mut seen = HashSet::new();
604 self.literal_guarantees
605 .iter()
606 .map(|e| &e.column.name)
607 .filter(|name| seen.insert(*name))
609 .map(|s| s.to_string())
610 .collect()
611 }
612}
613
614#[derive(Debug)]
616struct BoolVecBuilder {
617 inner: Vec<bool>,
621}
622
623impl BoolVecBuilder {
624 fn new(num_containers: usize) -> Self {
626 Self {
627 inner: vec![true; num_containers],
629 }
630 }
631
632 fn combine_array(&mut self, array: &BooleanArray) {
640 assert_eq!(array.len(), self.inner.len());
641 for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
642 if let Some(false) = new {
646 *cur = false;
647 }
648 }
649 }
650
651 fn combine_value(&mut self, value: ColumnarValue) {
657 match value {
658 ColumnarValue::Array(array) => {
659 self.combine_array(array.as_boolean());
660 }
661 ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
662 self.inner = vec![false; self.inner.len()];
664 }
665 _ => {
666 }
669 }
670 }
671
672 fn build(self) -> Vec<bool> {
674 self.inner
675 }
676
677 fn check_all_pruned(&self) -> bool {
679 self.inner.iter().all(|&x| !x)
680 }
681}
682
683fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
684 expr.as_any()
685 .downcast_ref::<phys_expr::Literal>()
686 .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
687 .unwrap_or_default()
688}
689
690fn is_always_false(expr: &Arc<dyn PhysicalExpr>) -> bool {
691 expr.as_any()
692 .downcast_ref::<phys_expr::Literal>()
693 .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(false))))
694 .unwrap_or_default()
695}
696
697#[derive(Debug, Default, Clone)]
707pub struct RequiredColumns {
708 columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
714}
715
716impl RequiredColumns {
717 fn new() -> Self {
718 Self::default()
719 }
720
721 #[allow(dead_code)]
730 pub fn single_column(&self) -> Option<&phys_expr::Column> {
732 if self.columns.windows(2).all(|w| {
733 let c1 = &w[0].0;
735 let c2 = &w[1].0;
736 c1 == c2
737 }) {
738 self.columns.first().map(|r| &r.0)
739 } else {
740 None
741 }
742 }
743
744 fn schema(&self) -> Schema {
751 let fields = self
752 .columns
753 .iter()
754 .map(|(_c, _t, f)| f.clone())
755 .collect::<Vec<_>>();
756 Schema::new(fields)
757 }
758
759 pub(crate) fn iter(
762 &self,
763 ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
764 self.columns.iter()
765 }
766
767 fn find_stat_column(
768 &self,
769 column: &phys_expr::Column,
770 statistics_type: StatisticsType,
771 ) -> Option<usize> {
772 match statistics_type {
773 StatisticsType::RowCount => {
774 self.columns
776 .iter()
777 .enumerate()
778 .find(|(_i, (_c, t, _f))| t == &statistics_type)
779 .map(|(i, (_c, _t, _f))| i)
780 }
781 _ => self
782 .columns
783 .iter()
784 .enumerate()
785 .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
786 .map(|(i, (_c, _t, _f))| i),
787 }
788 }
789
790 fn stat_column_expr(
799 &mut self,
800 column: &phys_expr::Column,
801 column_expr: &Arc<dyn PhysicalExpr>,
802 field: &Field,
803 stat_type: StatisticsType,
804 ) -> Result<Arc<dyn PhysicalExpr>> {
805 let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
806 Some(idx) => (idx, false),
807 None => (self.columns.len(), true),
808 };
809
810 let column_name = column.name();
811 let stat_column_name = match stat_type {
812 StatisticsType::Min => format!("{column_name}_min"),
813 StatisticsType::Max => format!("{column_name}_max"),
814 StatisticsType::NullCount => format!("{column_name}_null_count"),
815 StatisticsType::RowCount => "row_count".to_string(),
816 };
817
818 let stat_column = phys_expr::Column::new(&stat_column_name, idx);
819
820 if need_to_insert {
822 let nullable = true;
824 let stat_field =
825 Field::new(stat_column.name(), field.data_type().clone(), nullable);
826 self.columns.push((column.clone(), stat_type, stat_field));
827 }
828 rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
829 }
830
831 fn min_column_expr(
833 &mut self,
834 column: &phys_expr::Column,
835 column_expr: &Arc<dyn PhysicalExpr>,
836 field: &Field,
837 ) -> Result<Arc<dyn PhysicalExpr>> {
838 self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
839 }
840
841 fn max_column_expr(
843 &mut self,
844 column: &phys_expr::Column,
845 column_expr: &Arc<dyn PhysicalExpr>,
846 field: &Field,
847 ) -> Result<Arc<dyn PhysicalExpr>> {
848 self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
849 }
850
851 fn null_count_column_expr(
853 &mut self,
854 column: &phys_expr::Column,
855 column_expr: &Arc<dyn PhysicalExpr>,
856 field: &Field,
857 ) -> Result<Arc<dyn PhysicalExpr>> {
858 self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
859 }
860
861 fn row_count_column_expr(
863 &mut self,
864 column: &phys_expr::Column,
865 column_expr: &Arc<dyn PhysicalExpr>,
866 field: &Field,
867 ) -> Result<Arc<dyn PhysicalExpr>> {
868 self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
869 }
870}
871
872impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
873 fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
874 Self { columns }
875 }
876}
877
878fn build_statistics_record_batch<S: PruningStatistics + ?Sized>(
904 statistics: &S,
905 required_columns: &RequiredColumns,
906) -> Result<RecordBatch> {
907 let mut arrays = Vec::<ArrayRef>::new();
908 for (column, statistics_type, stat_field) in required_columns.iter() {
910 let column = Column::from_name(column.name());
911 let data_type = stat_field.data_type();
912
913 let num_containers = statistics.num_containers();
914
915 let array = match statistics_type {
916 StatisticsType::Min => statistics.min_values(&column),
917 StatisticsType::Max => statistics.max_values(&column),
918 StatisticsType::NullCount => statistics.null_counts(&column),
919 StatisticsType::RowCount => statistics.row_counts(&column),
920 };
921 let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
922
923 if num_containers != array.len() {
924 return internal_err!(
925 "mismatched statistics length. Expected {}, got {}",
926 num_containers,
927 array.len()
928 );
929 }
930
931 let array = arrow::compute::cast(&array, data_type)?;
934
935 arrays.push(array);
936 }
937
938 let schema = Arc::new(required_columns.schema());
939 let mut options = RecordBatchOptions::default();
941 options.row_count = Some(statistics.num_containers());
942
943 trace!("Creating statistics batch for {required_columns:#?} with {arrays:#?}");
944
945 RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
946 plan_datafusion_err!("Can not create statistics record batch: {err}")
947 })
948}
949
950struct PruningExpressionBuilder<'a> {
951 column: phys_expr::Column,
952 column_expr: Arc<dyn PhysicalExpr>,
953 op: Operator,
954 scalar_expr: Arc<dyn PhysicalExpr>,
955 field: &'a Field,
956 required_columns: &'a mut RequiredColumns,
957}
958
959impl<'a> PruningExpressionBuilder<'a> {
960 fn try_new(
961 left: &'a Arc<dyn PhysicalExpr>,
962 right: &'a Arc<dyn PhysicalExpr>,
963 op: Operator,
964 schema: &'a SchemaRef,
965 required_columns: &'a mut RequiredColumns,
966 ) -> Result<Self> {
967 let left_columns = collect_columns(left);
969 let right_columns = collect_columns(right);
970 let (column_expr, scalar_expr, columns, correct_operator) =
971 match (left_columns.len(), right_columns.len()) {
972 (1, 0) => (left, right, left_columns, op),
973 (0, 1) => (right, left, right_columns, reverse_operator(op)?),
974 _ => {
975 return plan_err!(
977 "Multi-column expressions are not currently supported"
978 );
979 }
980 };
981
982 let df_schema = DFSchema::try_from(Arc::clone(schema))?;
983 let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
984 column_expr,
985 correct_operator,
986 scalar_expr,
987 df_schema,
988 )?;
989 let column = columns.iter().next().unwrap().clone();
990 let field = match schema.column_with_name(column.name()) {
991 Some((_, f)) => f,
992 _ => {
993 return plan_err!("Field not found in schema");
994 }
995 };
996
997 Ok(Self {
998 column,
999 column_expr,
1000 op: correct_operator,
1001 scalar_expr,
1002 field,
1003 required_columns,
1004 })
1005 }
1006
1007 fn op(&self) -> Operator {
1008 self.op
1009 }
1010
1011 fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
1012 &self.scalar_expr
1013 }
1014
1015 fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1016 self.required_columns
1017 .min_column_expr(&self.column, &self.column_expr, self.field)
1018 }
1019
1020 fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1021 self.required_columns
1022 .max_column_expr(&self.column, &self.column_expr, self.field)
1023 }
1024
1025 fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1032 let column_expr = Arc::new(self.column.clone()) as _;
1034
1035 let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1037
1038 self.required_columns.null_count_column_expr(
1039 &self.column,
1040 &column_expr,
1041 null_count_field,
1042 )
1043 }
1044
1045 fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1052 let column_expr = Arc::new(self.column.clone()) as _;
1054
1055 let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1057
1058 self.required_columns.row_count_column_expr(
1059 &self.column,
1060 &column_expr,
1061 row_count_field,
1062 )
1063 }
1064}
1065
1066fn rewrite_expr_to_prunable(
1079 column_expr: &PhysicalExprRef,
1080 op: Operator,
1081 scalar_expr: &PhysicalExprRef,
1082 schema: DFSchema,
1083) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1084 if !is_compare_op(op) {
1085 return plan_err!("rewrite_expr_to_prunable only support compare expression");
1086 }
1087
1088 let column_expr_any = column_expr.as_any();
1089
1090 if column_expr_any
1091 .downcast_ref::<phys_expr::Column>()
1092 .is_some()
1093 {
1094 Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
1096 } else if let Some(cast) = column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
1097 let arrow_schema = schema.as_arrow();
1099 let from_type = cast.expr().data_type(arrow_schema)?;
1100 verify_support_type_for_prune(&from_type, cast.cast_type())?;
1101 let (left, op, right) =
1102 rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
1103 let left = Arc::new(phys_expr::CastExpr::new(
1104 left,
1105 cast.cast_type().clone(),
1106 None,
1107 ));
1108 Ok((left, op, right))
1109 } else if let Some(try_cast) =
1110 column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
1111 {
1112 let arrow_schema = schema.as_arrow();
1114 let from_type = try_cast.expr().data_type(arrow_schema)?;
1115 verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
1116 let (left, op, right) =
1117 rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?;
1118 let left = Arc::new(phys_expr::TryCastExpr::new(
1119 left,
1120 try_cast.cast_type().clone(),
1121 ));
1122 Ok((left, op, right))
1123 } else if let Some(neg) = column_expr_any.downcast_ref::<phys_expr::NegativeExpr>() {
1124 let (left, op, right) =
1126 rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
1127 let right = Arc::new(phys_expr::NegativeExpr::new(right));
1128 Ok((left, reverse_operator(op)?, right))
1129 } else if let Some(not) = column_expr_any.downcast_ref::<phys_expr::NotExpr>() {
1130 if op != Operator::Eq && op != Operator::NotEq {
1132 return plan_err!("Not with operator other than Eq / NotEq is not supported");
1133 }
1134 if not
1135 .arg()
1136 .as_any()
1137 .downcast_ref::<phys_expr::Column>()
1138 .is_some()
1139 {
1140 let left = Arc::clone(not.arg());
1141 let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
1142 Ok((left, reverse_operator(op)?, right))
1143 } else {
1144 plan_err!("Not with complex expression {column_expr:?} is not supported")
1145 }
1146 } else {
1147 plan_err!("column expression {column_expr:?} is not supported")
1148 }
1149}
1150
1151fn is_compare_op(op: Operator) -> bool {
1152 matches!(
1153 op,
1154 Operator::Eq
1155 | Operator::NotEq
1156 | Operator::Lt
1157 | Operator::LtEq
1158 | Operator::Gt
1159 | Operator::GtEq
1160 | Operator::LikeMatch
1161 | Operator::NotLikeMatch
1162 )
1163}
1164
1165fn is_string_type(data_type: &DataType) -> bool {
1166 matches!(
1167 data_type,
1168 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1169 )
1170}
1171
1172fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
1177 let from_type = match from_type {
1179 DataType::Dictionary(_, t) => {
1180 return verify_support_type_for_prune(t.as_ref(), to_type)
1181 }
1182 _ => from_type,
1183 };
1184 let to_type = match to_type {
1185 DataType::Dictionary(_, t) => {
1186 return verify_support_type_for_prune(from_type, t.as_ref())
1187 }
1188 _ => to_type,
1189 };
1190 if is_string_type(from_type) == is_string_type(to_type) {
1194 Ok(())
1195 } else {
1196 plan_err!(
1197 "Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
1198 )
1199 }
1200}
1201
1202fn rewrite_column_expr(
1204 e: Arc<dyn PhysicalExpr>,
1205 column_old: &phys_expr::Column,
1206 column_new: &phys_expr::Column,
1207) -> Result<Arc<dyn PhysicalExpr>> {
1208 e.transform_with_lambdas_params(|expr, lambdas_params| {
1209 if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1210 if !lambdas_params.contains(column.name()) && column == column_old {
1211 return Ok(Transformed::yes(Arc::new(column_new.clone())));
1212 }
1213 }
1214
1215 Ok(Transformed::no(expr))
1216 })
1217 .data()
1218}
1219
1220fn reverse_operator(op: Operator) -> Result<Operator> {
1221 op.swap().ok_or_else(|| {
1222 internal_datafusion_err!(
1223 "Could not reverse operator {op} while building pruning predicate"
1224 )
1225 })
1226}
1227
1228fn build_single_column_expr(
1233 column: &phys_expr::Column,
1234 schema: &Schema,
1235 required_columns: &mut RequiredColumns,
1236 is_not: bool, ) -> Option<Arc<dyn PhysicalExpr>> {
1238 let field = schema.field_with_name(column.name()).ok()?;
1239
1240 if matches!(field.data_type(), &DataType::Boolean) {
1241 let col_ref = Arc::new(column.clone()) as _;
1242
1243 let min = required_columns
1244 .min_column_expr(column, &col_ref, field)
1245 .ok()?;
1246 let max = required_columns
1247 .max_column_expr(column, &col_ref, field)
1248 .ok()?;
1249
1250 if is_not {
1254 Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
1257 phys_expr::BinaryExpr::new(min, Operator::And, max),
1258 ))))
1259 } else {
1260 Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
1263 }
1264 } else {
1265 None
1266 }
1267}
1268
1269fn build_is_null_column_expr(
1278 expr: &Arc<dyn PhysicalExpr>,
1279 schema: &Schema,
1280 required_columns: &mut RequiredColumns,
1281 with_not: bool,
1282) -> Option<Arc<dyn PhysicalExpr>> {
1283 if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1284 let field = schema.field_with_name(col.name()).ok()?;
1285
1286 let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1287 if with_not {
1288 if let Ok(row_count_expr) =
1289 required_columns.row_count_column_expr(col, expr, null_count_field)
1290 {
1291 required_columns
1292 .null_count_column_expr(col, expr, null_count_field)
1293 .map(|null_count_column_expr| {
1294 Arc::new(phys_expr::BinaryExpr::new(
1296 null_count_column_expr,
1297 Operator::NotEq,
1298 row_count_expr,
1299 )) as _
1300 })
1301 .ok()
1302 } else {
1303 None
1304 }
1305 } else {
1306 required_columns
1307 .null_count_column_expr(col, expr, null_count_field)
1308 .map(|null_count_column_expr| {
1309 Arc::new(phys_expr::BinaryExpr::new(
1311 null_count_column_expr,
1312 Operator::Gt,
1313 Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1314 )) as _
1315 })
1316 .ok()
1317 }
1318 } else {
1319 None
1320 }
1321}
1322
1323const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
1326
1327pub struct PredicateRewriter {
1330 unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1331}
1332
1333impl Default for PredicateRewriter {
1334 fn default() -> Self {
1335 Self {
1336 unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
1337 }
1338 }
1339}
1340
1341impl PredicateRewriter {
1342 pub fn new() -> Self {
1344 Self::default()
1345 }
1346
1347 pub fn with_unhandled_hook(
1349 self,
1350 unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1351 ) -> Self {
1352 Self { unhandled_hook }
1353 }
1354
1355 pub fn rewrite_predicate_to_statistics_predicate(
1365 &self,
1366 expr: &Arc<dyn PhysicalExpr>,
1367 schema: &Schema,
1368 ) -> Arc<dyn PhysicalExpr> {
1369 let mut required_columns = RequiredColumns::new();
1370 build_predicate_expression(
1371 expr,
1372 &Arc::new(schema.clone()),
1373 &mut required_columns,
1374 &self.unhandled_hook,
1375 )
1376 }
1377}
1378
1379fn build_predicate_expression(
1389 expr: &Arc<dyn PhysicalExpr>,
1390 schema: &SchemaRef,
1391 required_columns: &mut RequiredColumns,
1392 unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
1393) -> Arc<dyn PhysicalExpr> {
1394 if is_always_false(expr) {
1395 return Arc::clone(expr);
1398 }
1399 let expr_any = expr.as_any();
1401 if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
1402 return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
1403 .unwrap_or_else(|| unhandled_hook.handle(expr));
1404 }
1405 if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
1406 return build_is_null_column_expr(
1407 is_not_null.arg(),
1408 schema,
1409 required_columns,
1410 true,
1411 )
1412 .unwrap_or_else(|| unhandled_hook.handle(expr));
1413 }
1414 if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
1415 return build_single_column_expr(col, schema, required_columns, false)
1416 .unwrap_or_else(|| unhandled_hook.handle(expr));
1417 }
1418 if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() {
1419 if let Some(col) = not.arg().as_any().downcast_ref::<phys_expr::Column>() {
1421 return build_single_column_expr(col, schema, required_columns, true)
1422 .unwrap_or_else(|| unhandled_hook.handle(expr));
1423 } else {
1424 return unhandled_hook.handle(expr);
1425 }
1426 }
1427 if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() {
1428 if !in_list.list().is_empty()
1429 && in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
1430 {
1431 let eq_op = if in_list.negated() {
1432 Operator::NotEq
1433 } else {
1434 Operator::Eq
1435 };
1436 let re_op = if in_list.negated() {
1437 Operator::And
1438 } else {
1439 Operator::Or
1440 };
1441 let change_expr = in_list
1442 .list()
1443 .iter()
1444 .map(|e| {
1445 Arc::new(phys_expr::BinaryExpr::new(
1446 Arc::clone(in_list.expr()),
1447 eq_op,
1448 Arc::clone(e),
1449 )) as _
1450 })
1451 .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
1452 .unwrap();
1453 return build_predicate_expression(
1454 &change_expr,
1455 schema,
1456 required_columns,
1457 unhandled_hook,
1458 );
1459 } else {
1460 return unhandled_hook.handle(expr);
1461 }
1462 }
1463
1464 let (left, op, right) = {
1465 if let Some(bin_expr) = expr_any.downcast_ref::<phys_expr::BinaryExpr>() {
1466 (
1467 Arc::clone(bin_expr.left()),
1468 *bin_expr.op(),
1469 Arc::clone(bin_expr.right()),
1470 )
1471 } else if let Some(like_expr) = expr_any.downcast_ref::<phys_expr::LikeExpr>() {
1472 if like_expr.case_insensitive() {
1473 return unhandled_hook.handle(expr);
1474 }
1475 let op = match (like_expr.negated(), like_expr.case_insensitive()) {
1476 (false, false) => Operator::LikeMatch,
1477 (true, false) => Operator::NotLikeMatch,
1478 (false, true) => Operator::ILikeMatch,
1479 (true, true) => Operator::NotILikeMatch,
1480 };
1481 (
1482 Arc::clone(like_expr.expr()),
1483 op,
1484 Arc::clone(like_expr.pattern()),
1485 )
1486 } else {
1487 return unhandled_hook.handle(expr);
1488 }
1489 };
1490
1491 if op == Operator::And || op == Operator::Or {
1492 let left_expr =
1493 build_predicate_expression(&left, schema, required_columns, unhandled_hook);
1494 let right_expr =
1495 build_predicate_expression(&right, schema, required_columns, unhandled_hook);
1496 let expr = match (&left_expr, op, &right_expr) {
1498 (left, Operator::And, right)
1499 if is_always_false(left) || is_always_false(right) =>
1500 {
1501 Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(false))))
1502 }
1503 (left, Operator::And, _) if is_always_true(left) => right_expr,
1504 (_, Operator::And, right) if is_always_true(right) => left_expr,
1505 (left, Operator::Or, right)
1506 if is_always_true(left) || is_always_true(right) =>
1507 {
1508 Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
1509 }
1510 (left, Operator::Or, _) if is_always_false(left) => right_expr,
1511 (_, Operator::Or, right) if is_always_false(right) => left_expr,
1512
1513 _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
1514 };
1515 return expr;
1516 }
1517
1518 let expr_builder =
1519 PruningExpressionBuilder::try_new(&left, &right, op, schema, required_columns);
1520 let mut expr_builder = match expr_builder {
1521 Ok(builder) => builder,
1522 Err(e) => {
1525 debug!("Error building pruning expression: {e}");
1526 return unhandled_hook.handle(expr);
1527 }
1528 };
1529
1530 build_statistics_expr(&mut expr_builder)
1531 .unwrap_or_else(|_| unhandled_hook.handle(expr))
1532}
1533
1534fn build_statistics_expr(
1535 expr_builder: &mut PruningExpressionBuilder,
1536) -> Result<Arc<dyn PhysicalExpr>> {
1537 let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
1538 Operator::NotEq => {
1539 let min_column_expr = expr_builder.min_column_expr()?;
1543 let max_column_expr = expr_builder.max_column_expr()?;
1544 Arc::new(phys_expr::BinaryExpr::new(
1545 Arc::new(phys_expr::BinaryExpr::new(
1546 min_column_expr,
1547 Operator::NotEq,
1548 Arc::clone(expr_builder.scalar_expr()),
1549 )),
1550 Operator::Or,
1551 Arc::new(phys_expr::BinaryExpr::new(
1552 Arc::clone(expr_builder.scalar_expr()),
1553 Operator::NotEq,
1554 max_column_expr,
1555 )),
1556 ))
1557 }
1558 Operator::Eq => {
1559 let min_column_expr = expr_builder.min_column_expr()?;
1562 let max_column_expr = expr_builder.max_column_expr()?;
1563 Arc::new(phys_expr::BinaryExpr::new(
1564 Arc::new(phys_expr::BinaryExpr::new(
1565 min_column_expr,
1566 Operator::LtEq,
1567 Arc::clone(expr_builder.scalar_expr()),
1568 )),
1569 Operator::And,
1570 Arc::new(phys_expr::BinaryExpr::new(
1571 Arc::clone(expr_builder.scalar_expr()),
1572 Operator::LtEq,
1573 max_column_expr,
1574 )),
1575 ))
1576 }
1577 Operator::NotLikeMatch => build_not_like_match(expr_builder)?,
1578 Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
1579 plan_datafusion_err!(
1580 "LIKE expression with wildcard at the beginning is not supported"
1581 )
1582 })?,
1583 Operator::Gt => {
1584 Arc::new(phys_expr::BinaryExpr::new(
1586 expr_builder.max_column_expr()?,
1587 Operator::Gt,
1588 Arc::clone(expr_builder.scalar_expr()),
1589 ))
1590 }
1591 Operator::GtEq => {
1592 Arc::new(phys_expr::BinaryExpr::new(
1594 expr_builder.max_column_expr()?,
1595 Operator::GtEq,
1596 Arc::clone(expr_builder.scalar_expr()),
1597 ))
1598 }
1599 Operator::Lt => {
1600 Arc::new(phys_expr::BinaryExpr::new(
1602 expr_builder.min_column_expr()?,
1603 Operator::Lt,
1604 Arc::clone(expr_builder.scalar_expr()),
1605 ))
1606 }
1607 Operator::LtEq => {
1608 Arc::new(phys_expr::BinaryExpr::new(
1610 expr_builder.min_column_expr()?,
1611 Operator::LtEq,
1612 Arc::clone(expr_builder.scalar_expr()),
1613 ))
1614 }
1615 _ => {
1617 return plan_err!(
1618 "expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
1619 );
1620 }
1621 };
1622 let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
1623 Ok(statistics_expr)
1624}
1625
1626fn unpack_string(s: &ScalarValue) -> Option<&str> {
1628 s.try_as_str().flatten()
1629}
1630
1631fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
1632 if let Some(lit) = expr.as_any().downcast_ref::<phys_expr::Literal>() {
1633 let s = unpack_string(lit.value())?;
1634 return Some(s);
1635 }
1636 None
1637}
1638
1639fn build_like_match(
1643 expr_builder: &mut PruningExpressionBuilder,
1644) -> Option<Arc<dyn PhysicalExpr>> {
1645 let min_column_expr = expr_builder.min_column_expr().ok()?;
1654 let max_column_expr = expr_builder.max_column_expr().ok()?;
1655 let scalar_expr = expr_builder.scalar_expr();
1656 let s = extract_string_literal(scalar_expr)?;
1658 let first_wildcard_index = s.find(['%', '_']);
1660 if first_wildcard_index == Some(0) {
1661 return None;
1663 }
1664 let (lower_bound, upper_bound) = if let Some(wildcard_index) = first_wildcard_index {
1665 let prefix = &s[..wildcard_index];
1666 let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1667 prefix.to_string(),
1668 ))));
1669 let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1670 increment_utf8(prefix)?,
1671 ))));
1672 (lower_bound_lit, upper_bound_lit)
1673 } else {
1674 let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1676 s.to_string(),
1677 ))));
1678 (Arc::clone(&bound), bound)
1679 };
1680 let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1681 lower_bound,
1682 Operator::LtEq,
1683 Arc::clone(&max_column_expr),
1684 ));
1685 let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1686 Arc::clone(&min_column_expr),
1687 Operator::LtEq,
1688 upper_bound,
1689 ));
1690 let combined = Arc::new(phys_expr::BinaryExpr::new(
1691 upper_bound_expr,
1692 Operator::And,
1693 lower_bound_expr,
1694 ));
1695 Some(combined)
1696}
1697
1698fn build_not_like_match(
1704 expr_builder: &mut PruningExpressionBuilder<'_>,
1705) -> Result<Arc<dyn PhysicalExpr>> {
1706 let min_column_expr = expr_builder.min_column_expr()?;
1709 let max_column_expr = expr_builder.max_column_expr()?;
1710
1711 let scalar_expr = expr_builder.scalar_expr();
1712
1713 let pattern = extract_string_literal(scalar_expr).ok_or_else(|| {
1714 plan_datafusion_err!("cannot extract literal from NOT LIKE expression")
1715 })?;
1716
1717 let (const_prefix, remaining) = split_constant_prefix(pattern);
1718 if const_prefix.is_empty() || remaining != "%" {
1719 return Err(plan_datafusion_err!(
1731 "NOT LIKE expressions only support constant_prefix+wildcard`%`"
1732 ));
1733 }
1734
1735 let min_col_not_like_epxr = Arc::new(phys_expr::LikeExpr::new(
1736 true,
1737 false,
1738 Arc::clone(&min_column_expr),
1739 Arc::clone(scalar_expr),
1740 ));
1741
1742 let max_col_not_like_expr = Arc::new(phys_expr::LikeExpr::new(
1743 true,
1744 false,
1745 Arc::clone(&max_column_expr),
1746 Arc::clone(scalar_expr),
1747 ));
1748
1749 Ok(Arc::new(phys_expr::BinaryExpr::new(
1750 min_col_not_like_epxr,
1751 Operator::Or,
1752 max_col_not_like_expr,
1753 )))
1754}
1755
1756fn split_constant_prefix(pattern: &str) -> (&str, &str) {
1758 let char_indices = pattern.char_indices().collect::<Vec<_>>();
1759 for i in 0..char_indices.len() {
1760 let (idx, char) = char_indices[i];
1761 if char == '%' || char == '_' {
1762 if i != 0 && char_indices[i - 1].1 == '\\' {
1763 continue;
1765 }
1766 return (&pattern[..idx], &pattern[idx..]);
1767 }
1768 }
1769 (pattern, "")
1770}
1771
1772fn increment_utf8(data: &str) -> Option<String> {
1780 fn is_valid_unicode(c: char) -> bool {
1782 let cp = c as u32;
1783
1784 if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
1786 return false;
1787 }
1788
1789 if cp >= 0x110000 {
1791 return false;
1792 }
1793
1794 true
1795 }
1796
1797 let mut code_points: Vec<char> = data.chars().collect();
1799
1800 for idx in (0..code_points.len()).rev() {
1802 let original = code_points[idx] as u32;
1803
1804 if let Some(next_char) = char::from_u32(original + 1) {
1806 if is_valid_unicode(next_char) {
1807 code_points[idx] = next_char;
1808 code_points.truncate(idx + 1);
1810 return Some(code_points.into_iter().collect());
1811 }
1812 }
1813 }
1814
1815 None
1816}
1817
1818fn wrap_null_count_check_expr(
1839 statistics_expr: Arc<dyn PhysicalExpr>,
1840 expr_builder: &mut PruningExpressionBuilder,
1841) -> Result<Arc<dyn PhysicalExpr>> {
1842 let not_when_null_count_eq_row_count = Arc::new(phys_expr::BinaryExpr::new(
1844 expr_builder.null_count_column_expr()?,
1845 Operator::NotEq,
1846 expr_builder.row_count_column_expr()?,
1847 ));
1848
1849 Ok(Arc::new(phys_expr::BinaryExpr::new(
1851 not_when_null_count_eq_row_count,
1852 Operator::And,
1853 statistics_expr,
1854 )))
1855}
1856
1857#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1858pub(crate) enum StatisticsType {
1859 Min,
1860 Max,
1861 NullCount,
1862 RowCount,
1863}
1864
1865#[cfg(test)]
1866mod tests {
1867 use std::collections::HashMap;
1868 use std::ops::{Not, Rem};
1869
1870 use super::*;
1871 use datafusion_common::test_util::batches_to_string;
1872 use datafusion_expr::{and, col, lit, or};
1873 use insta::assert_snapshot;
1874
1875 use arrow::array::Decimal128Array;
1876 use arrow::{
1877 array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
1878 datatypes::TimeUnit,
1879 };
1880 use datafusion_expr::expr::InList;
1881 use datafusion_expr::{cast, is_null, try_cast, Expr};
1882 use datafusion_functions_nested::expr_fn::{array_has, make_array};
1883 use datafusion_physical_expr::expressions as phys_expr;
1884 use datafusion_physical_expr::planner::logical2physical;
1885
1886 #[derive(Debug, Default)]
1887 struct ContainerStats {
1895 min: Option<ArrayRef>,
1896 max: Option<ArrayRef>,
1897 null_counts: Option<ArrayRef>,
1899 row_counts: Option<ArrayRef>,
1900 contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
1904 }
1905
1906 impl ContainerStats {
1907 fn new() -> Self {
1908 Default::default()
1909 }
1910 fn new_decimal128(
1911 min: impl IntoIterator<Item = Option<i128>>,
1912 max: impl IntoIterator<Item = Option<i128>>,
1913 precision: u8,
1914 scale: i8,
1915 ) -> Self {
1916 Self::new()
1917 .with_min(Arc::new(
1918 min.into_iter()
1919 .collect::<Decimal128Array>()
1920 .with_precision_and_scale(precision, scale)
1921 .unwrap(),
1922 ))
1923 .with_max(Arc::new(
1924 max.into_iter()
1925 .collect::<Decimal128Array>()
1926 .with_precision_and_scale(precision, scale)
1927 .unwrap(),
1928 ))
1929 }
1930
1931 fn new_i64(
1932 min: impl IntoIterator<Item = Option<i64>>,
1933 max: impl IntoIterator<Item = Option<i64>>,
1934 ) -> Self {
1935 Self::new()
1936 .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
1937 .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
1938 }
1939
1940 fn new_i32(
1941 min: impl IntoIterator<Item = Option<i32>>,
1942 max: impl IntoIterator<Item = Option<i32>>,
1943 ) -> Self {
1944 Self::new()
1945 .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
1946 .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
1947 }
1948
1949 fn new_utf8<'a>(
1950 min: impl IntoIterator<Item = Option<&'a str>>,
1951 max: impl IntoIterator<Item = Option<&'a str>>,
1952 ) -> Self {
1953 Self::new()
1954 .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
1955 .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
1956 }
1957
1958 fn new_bool(
1959 min: impl IntoIterator<Item = Option<bool>>,
1960 max: impl IntoIterator<Item = Option<bool>>,
1961 ) -> Self {
1962 Self::new()
1963 .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
1964 .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
1965 }
1966
1967 fn min(&self) -> Option<ArrayRef> {
1968 self.min.clone()
1969 }
1970
1971 fn max(&self) -> Option<ArrayRef> {
1972 self.max.clone()
1973 }
1974
1975 fn null_counts(&self) -> Option<ArrayRef> {
1976 self.null_counts.clone()
1977 }
1978
1979 fn row_counts(&self) -> Option<ArrayRef> {
1980 self.row_counts.clone()
1981 }
1982
1983 fn arrays(&self) -> Vec<ArrayRef> {
1985 let contained_arrays = self
1986 .contained
1987 .iter()
1988 .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
1989
1990 [
1991 self.min.as_ref().cloned(),
1992 self.max.as_ref().cloned(),
1993 self.null_counts.as_ref().cloned(),
1994 self.row_counts.as_ref().cloned(),
1995 ]
1996 .into_iter()
1997 .flatten()
1998 .chain(contained_arrays)
1999 .collect()
2000 }
2001
2002 fn len(&self) -> usize {
2006 self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
2008 }
2009
2010 fn assert_invariants(&self) {
2012 let mut prev_len = None;
2013
2014 for len in self.arrays().iter().map(|a| a.len()) {
2015 match prev_len {
2017 None => {
2018 prev_len = Some(len);
2019 }
2020 Some(prev_len) => {
2021 assert_eq!(prev_len, len);
2022 }
2023 }
2024 }
2025 }
2026
2027 fn with_min(mut self, min: ArrayRef) -> Self {
2029 self.min = Some(min);
2030 self
2031 }
2032
2033 fn with_max(mut self, max: ArrayRef) -> Self {
2035 self.max = Some(max);
2036 self
2037 }
2038
2039 fn with_null_counts(
2042 mut self,
2043 counts: impl IntoIterator<Item = Option<u64>>,
2044 ) -> Self {
2045 let null_counts: ArrayRef =
2046 Arc::new(counts.into_iter().collect::<UInt64Array>());
2047
2048 self.assert_invariants();
2049 self.null_counts = Some(null_counts);
2050 self
2051 }
2052
2053 fn with_row_counts(
2056 mut self,
2057 counts: impl IntoIterator<Item = Option<u64>>,
2058 ) -> Self {
2059 let row_counts: ArrayRef =
2060 Arc::new(counts.into_iter().collect::<UInt64Array>());
2061
2062 self.assert_invariants();
2063 self.row_counts = Some(row_counts);
2064 self
2065 }
2066
2067 pub fn with_contained(
2069 mut self,
2070 values: impl IntoIterator<Item = ScalarValue>,
2071 contained: impl IntoIterator<Item = Option<bool>>,
2072 ) -> Self {
2073 let contained: BooleanArray = contained.into_iter().collect();
2074 let values: HashSet<_> = values.into_iter().collect();
2075
2076 self.contained.push((values, contained));
2077 self.assert_invariants();
2078 self
2079 }
2080
2081 fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
2083 self.contained
2085 .iter()
2086 .find(|(values, _contained)| values == find_values)
2087 .map(|(_values, contained)| contained.clone())
2088 }
2089 }
2090
2091 #[derive(Debug, Default)]
2092 struct TestStatistics {
2093 stats: HashMap<Column, ContainerStats>,
2095 }
2096
2097 impl TestStatistics {
2098 fn new() -> Self {
2099 Self::default()
2100 }
2101
2102 fn with(
2103 mut self,
2104 name: impl Into<String>,
2105 container_stats: ContainerStats,
2106 ) -> Self {
2107 let col = Column::from_name(name.into());
2108 self.stats.insert(col, container_stats);
2109 self
2110 }
2111
2112 fn with_null_counts(
2116 mut self,
2117 name: impl Into<String>,
2118 counts: impl IntoIterator<Item = Option<u64>>,
2119 ) -> Self {
2120 let col = Column::from_name(name.into());
2121
2122 let container_stats = self
2124 .stats
2125 .remove(&col)
2126 .unwrap_or_default()
2127 .with_null_counts(counts);
2128
2129 self.stats.insert(col, container_stats);
2131 self
2132 }
2133
2134 fn with_row_counts(
2138 mut self,
2139 name: impl Into<String>,
2140 counts: impl IntoIterator<Item = Option<u64>>,
2141 ) -> Self {
2142 let col = Column::from_name(name.into());
2143
2144 let container_stats = self
2146 .stats
2147 .remove(&col)
2148 .unwrap_or_default()
2149 .with_row_counts(counts);
2150
2151 self.stats.insert(col, container_stats);
2153 self
2154 }
2155
2156 fn with_contained(
2158 mut self,
2159 name: impl Into<String>,
2160 values: impl IntoIterator<Item = ScalarValue>,
2161 contained: impl IntoIterator<Item = Option<bool>>,
2162 ) -> Self {
2163 let col = Column::from_name(name.into());
2164
2165 let container_stats = self
2167 .stats
2168 .remove(&col)
2169 .unwrap_or_default()
2170 .with_contained(values, contained);
2171
2172 self.stats.insert(col, container_stats);
2174 self
2175 }
2176 }
2177
2178 impl PruningStatistics for TestStatistics {
2179 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
2180 self.stats
2181 .get(column)
2182 .map(|container_stats| container_stats.min())
2183 .unwrap_or(None)
2184 }
2185
2186 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
2187 self.stats
2188 .get(column)
2189 .map(|container_stats| container_stats.max())
2190 .unwrap_or(None)
2191 }
2192
2193 fn num_containers(&self) -> usize {
2194 self.stats
2195 .values()
2196 .next()
2197 .map(|container_stats| container_stats.len())
2198 .unwrap_or(0)
2199 }
2200
2201 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
2202 self.stats
2203 .get(column)
2204 .map(|container_stats| container_stats.null_counts())
2205 .unwrap_or(None)
2206 }
2207
2208 fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
2209 self.stats
2210 .get(column)
2211 .map(|container_stats| container_stats.row_counts())
2212 .unwrap_or(None)
2213 }
2214
2215 fn contained(
2216 &self,
2217 column: &Column,
2218 values: &HashSet<ScalarValue>,
2219 ) -> Option<BooleanArray> {
2220 self.stats
2221 .get(column)
2222 .and_then(|container_stats| container_stats.contained(values))
2223 }
2224 }
2225
2226 struct OneContainerStats {
2228 min_values: Option<ArrayRef>,
2229 max_values: Option<ArrayRef>,
2230 num_containers: usize,
2231 }
2232
2233 impl PruningStatistics for OneContainerStats {
2234 fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
2235 self.min_values.clone()
2236 }
2237
2238 fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
2239 self.max_values.clone()
2240 }
2241
2242 fn num_containers(&self) -> usize {
2243 self.num_containers
2244 }
2245
2246 fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
2247 None
2248 }
2249
2250 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
2251 None
2252 }
2253
2254 fn contained(
2255 &self,
2256 _column: &Column,
2257 _values: &HashSet<ScalarValue>,
2258 ) -> Option<BooleanArray> {
2259 None
2260 }
2261 }
2262
2263 #[test]
2266 fn test_unique_row_count_field_and_column() {
2267 let schema: SchemaRef = Arc::new(Schema::new(vec![
2269 Field::new("c1", DataType::Int32, true),
2270 Field::new("c2", DataType::Int32, true),
2271 ]));
2272 let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2273 let expr = logical2physical(&expr, &schema);
2274 let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2275 assert_eq!(
2277 "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2278 p.predicate_expr.to_string()
2279 );
2280
2281 let mut fields = HashSet::new();
2284 for (_col, _ty, field) in p.required_columns().iter() {
2285 let was_new = fields.insert(field);
2286 if !was_new {
2287 panic!(
2288 "Duplicate field in required schema: {field:?}. Previous fields:\n{fields:#?}"
2289 );
2290 }
2291 }
2292 }
2293
2294 #[test]
2295 fn prune_all_rows_null_counts() {
2296 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2299 let statistics = TestStatistics::new().with(
2300 "i",
2301 ContainerStats::new_i32(
2302 vec![Some(0)], vec![Some(0)], )
2305 .with_null_counts(vec![Some(1)])
2306 .with_row_counts(vec![Some(1)]),
2307 );
2308 let expected_ret = &[false];
2309 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2310
2311 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2313 let container_stats = ContainerStats {
2314 min: Some(Arc::new(Int32Array::from(vec![None]))),
2315 max: Some(Arc::new(Int32Array::from(vec![None]))),
2316 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2317 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2318 ..ContainerStats::default()
2319 };
2320 let statistics = TestStatistics::new().with("i", container_stats);
2321 let expected_ret = &[false];
2322 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2323
2324 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2326 let container_stats = ContainerStats {
2327 min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2328 max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2329 null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2330 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2331 ..ContainerStats::default()
2332 };
2333 let statistics = TestStatistics::new().with("i", container_stats);
2334 let expected_ret = &[true];
2335 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2336 let expected_ret = &[false];
2337 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2338
2339 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2341 let container_stats = ContainerStats {
2342 min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2343 max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2344 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2345 row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2346 ..ContainerStats::default()
2347 };
2348 let statistics = TestStatistics::new().with("i", container_stats);
2349 let expected_ret = &[true];
2350 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2351 let expected_ret = &[false];
2352 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2353 }
2354
2355 #[test]
2356 fn prune_missing_statistics() {
2357 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2360 let container_stats = ContainerStats {
2361 min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
2362 max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
2363 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
2364 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
2365 ..ContainerStats::default()
2366 };
2367 let statistics = TestStatistics::new().with("i", container_stats);
2368 let expected_ret = &[true, true];
2369 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2370 let expected_ret = &[false, true];
2371 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2372 let expected_ret = &[true, false];
2373 prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
2374 }
2375
2376 #[test]
2377 fn prune_null_stats() {
2378 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2381
2382 let statistics = TestStatistics::new().with(
2383 "i",
2384 ContainerStats::new_i32(
2385 vec![Some(0)], vec![Some(0)], )
2388 .with_null_counts(vec![Some(1)])
2389 .with_row_counts(vec![Some(1)]),
2390 );
2391
2392 let expected_ret = &[false];
2393
2394 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2396 }
2397
2398 #[test]
2399 fn test_build_statistics_record_batch() {
2400 let required_columns = RequiredColumns::from(vec![
2402 (
2404 phys_expr::Column::new("s1", 1),
2405 StatisticsType::Min,
2406 Field::new("s1_min", DataType::Int32, true),
2407 ),
2408 (
2410 phys_expr::Column::new("s2", 2),
2411 StatisticsType::Max,
2412 Field::new("s2_max", DataType::Int32, true),
2413 ),
2414 (
2416 phys_expr::Column::new("s3", 3),
2417 StatisticsType::Max,
2418 Field::new("s3_max", DataType::Utf8, true),
2419 ),
2420 (
2422 phys_expr::Column::new("s3", 3),
2423 StatisticsType::Min,
2424 Field::new("s3_min", DataType::Utf8, true),
2425 ),
2426 ]);
2427
2428 let statistics = TestStatistics::new()
2429 .with(
2430 "s1",
2431 ContainerStats::new_i32(
2432 vec![None, None, Some(9), None], vec![Some(10), None, None, None], ),
2435 )
2436 .with(
2437 "s2",
2438 ContainerStats::new_i32(
2439 vec![Some(2), None, None, None], vec![Some(20), None, None, None], ),
2442 )
2443 .with(
2444 "s3",
2445 ContainerStats::new_utf8(
2446 vec![Some("a"), None, None, None], vec![Some("q"), None, Some("r"), None], ),
2449 );
2450
2451 let batch =
2452 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2453 assert_snapshot!(batches_to_string(&[batch]), @r"
2454 +--------+--------+--------+--------+
2455 | s1_min | s2_max | s3_max | s3_min |
2456 +--------+--------+--------+--------+
2457 | | 20 | q | a |
2458 | | | | |
2459 | 9 | | r | |
2460 | | | | |
2461 +--------+--------+--------+--------+
2462 ");
2463 }
2464
2465 #[test]
2466 fn test_build_statistics_casting() {
2467 let required_columns = RequiredColumns::from(vec![(
2472 phys_expr::Column::new("s3", 3),
2473 StatisticsType::Min,
2474 Field::new(
2475 "s1_min",
2476 DataType::Timestamp(TimeUnit::Nanosecond, None),
2477 true,
2478 ),
2479 )]);
2480
2481 let statistics = OneContainerStats {
2483 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2484 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2485 num_containers: 1,
2486 };
2487
2488 let batch =
2489 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2490
2491 assert_snapshot!(batches_to_string(&[batch]), @r"
2492 +-------------------------------+
2493 | s1_min |
2494 +-------------------------------+
2495 | 1970-01-01T00:00:00.000000010 |
2496 +-------------------------------+
2497 ");
2498 }
2499
2500 #[test]
2501 fn test_build_statistics_no_required_stats() {
2502 let required_columns = RequiredColumns::new();
2503
2504 let statistics = OneContainerStats {
2505 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2506 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2507 num_containers: 1,
2508 };
2509
2510 let batch =
2511 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2512 assert_eq!(batch.num_rows(), 1); }
2514
2515 #[test]
2516 fn test_build_statistics_inconsistent_types() {
2517 let required_columns = RequiredColumns::from(vec![(
2521 phys_expr::Column::new("s3", 3),
2522 StatisticsType::Min,
2523 Field::new("s1_min", DataType::Utf8, true),
2524 )]);
2525
2526 let statistics = OneContainerStats {
2528 min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
2529 max_values: None,
2530 num_containers: 1,
2531 };
2532
2533 let batch =
2534 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2535 assert_snapshot!(batches_to_string(&[batch]), @r"
2536 +--------+
2537 | s1_min |
2538 +--------+
2539 | |
2540 +--------+
2541 ");
2542 }
2543
2544 #[test]
2545 fn test_build_statistics_inconsistent_length() {
2546 let required_columns = RequiredColumns::from(vec![(
2548 phys_expr::Column::new("s1", 3),
2549 StatisticsType::Min,
2550 Field::new("s1_min", DataType::Int64, true),
2551 )]);
2552
2553 let statistics = OneContainerStats {
2555 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2556 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2557 num_containers: 3,
2558 };
2559
2560 let result =
2561 build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
2562 assert!(
2563 result
2564 .to_string()
2565 .contains("mismatched statistics length. Expected 3, got 1"),
2566 "{}",
2567 result
2568 );
2569 }
2570
2571 #[test]
2572 fn row_group_predicate_eq() -> Result<()> {
2573 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2574 let expected_expr =
2575 "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2576
2577 let expr = col("c1").eq(lit(1));
2579 let predicate_expr =
2580 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2581 assert_eq!(predicate_expr.to_string(), expected_expr);
2582
2583 let expr = lit(1).eq(col("c1"));
2585 let predicate_expr =
2586 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2587 assert_eq!(predicate_expr.to_string(), expected_expr);
2588
2589 Ok(())
2590 }
2591
2592 #[test]
2593 fn row_group_predicate_not_eq() -> Result<()> {
2594 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2595 let expected_expr =
2596 "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2597
2598 let expr = col("c1").not_eq(lit(1));
2600 let predicate_expr =
2601 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2602 assert_eq!(predicate_expr.to_string(), expected_expr);
2603
2604 let expr = lit(1).not_eq(col("c1"));
2606 let predicate_expr =
2607 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2608 assert_eq!(predicate_expr.to_string(), expected_expr);
2609
2610 Ok(())
2611 }
2612
2613 #[test]
2614 fn row_group_predicate_gt() -> Result<()> {
2615 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2616 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
2617
2618 let expr = col("c1").gt(lit(1));
2620 let predicate_expr =
2621 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2622 assert_eq!(predicate_expr.to_string(), expected_expr);
2623
2624 let expr = lit(1).lt(col("c1"));
2626 let predicate_expr =
2627 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2628 assert_eq!(predicate_expr.to_string(), expected_expr);
2629
2630 Ok(())
2631 }
2632
2633 #[test]
2634 fn row_group_predicate_gt_eq() -> Result<()> {
2635 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2636 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
2637
2638 let expr = col("c1").gt_eq(lit(1));
2640 let predicate_expr =
2641 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2642 assert_eq!(predicate_expr.to_string(), expected_expr);
2643 let expr = lit(1).lt_eq(col("c1"));
2645 let predicate_expr =
2646 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2647 assert_eq!(predicate_expr.to_string(), expected_expr);
2648
2649 Ok(())
2650 }
2651
2652 #[test]
2653 fn row_group_predicate_lt() -> Result<()> {
2654 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2655 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2656
2657 let expr = col("c1").lt(lit(1));
2659 let predicate_expr =
2660 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2661 assert_eq!(predicate_expr.to_string(), expected_expr);
2662
2663 let expr = lit(1).gt(col("c1"));
2665 let predicate_expr =
2666 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2667 assert_eq!(predicate_expr.to_string(), expected_expr);
2668
2669 Ok(())
2670 }
2671
2672 #[test]
2673 fn row_group_predicate_lt_eq() -> Result<()> {
2674 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2675 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
2676
2677 let expr = col("c1").lt_eq(lit(1));
2679 let predicate_expr =
2680 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2681 assert_eq!(predicate_expr.to_string(), expected_expr);
2682 let expr = lit(1).gt_eq(col("c1"));
2684 let predicate_expr =
2685 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2686 assert_eq!(predicate_expr.to_string(), expected_expr);
2687
2688 Ok(())
2689 }
2690
2691 #[test]
2692 fn row_group_predicate_and() -> Result<()> {
2693 let schema = Schema::new(vec![
2694 Field::new("c1", DataType::Int32, false),
2695 Field::new("c2", DataType::Int32, false),
2696 Field::new("c3", DataType::Int32, false),
2697 ]);
2698 let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2700 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2701 let predicate_expr =
2702 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2703 assert_eq!(predicate_expr.to_string(), expected_expr);
2704
2705 Ok(())
2706 }
2707
2708 #[test]
2709 fn row_group_predicate_or() -> Result<()> {
2710 let schema = Schema::new(vec![
2711 Field::new("c1", DataType::Int32, false),
2712 Field::new("c2", DataType::Int32, false),
2713 ]);
2714 let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
2716 let expected_expr = "true";
2717 let predicate_expr =
2718 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2719 assert_eq!(predicate_expr.to_string(), expected_expr);
2720
2721 Ok(())
2722 }
2723
2724 #[test]
2725 fn row_group_predicate_not() -> Result<()> {
2726 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2727 let expected_expr = "true";
2728
2729 let expr = col("c1").not();
2730 let predicate_expr =
2731 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2732 assert_eq!(predicate_expr.to_string(), expected_expr);
2733
2734 Ok(())
2735 }
2736
2737 #[test]
2738 fn row_group_predicate_not_bool() -> Result<()> {
2739 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2740 let expected_expr = "NOT c1_min@0 AND c1_max@1";
2741
2742 let expr = col("c1").not();
2743 let predicate_expr =
2744 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2745 assert_eq!(predicate_expr.to_string(), expected_expr);
2746
2747 Ok(())
2748 }
2749
2750 #[test]
2751 fn row_group_predicate_bool() -> Result<()> {
2752 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2753 let expected_expr = "c1_min@0 OR c1_max@1";
2754
2755 let expr = col("c1");
2756 let predicate_expr =
2757 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2758 assert_eq!(predicate_expr.to_string(), expected_expr);
2759
2760 Ok(())
2761 }
2762
2763 #[test]
2764 fn row_group_predicate_lt_bool() -> Result<()> {
2765 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2766 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
2767
2768 let expr = col("c1").lt(lit(true));
2771 let predicate_expr =
2772 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2773 assert_eq!(predicate_expr.to_string(), expected_expr);
2774
2775 Ok(())
2776 }
2777
2778 #[test]
2779 fn row_group_predicate_required_columns() -> Result<()> {
2780 let schema = Schema::new(vec![
2781 Field::new("c1", DataType::Int32, false),
2782 Field::new("c2", DataType::Int32, false),
2783 ]);
2784 let mut required_columns = RequiredColumns::new();
2785 let expr = col("c1")
2787 .lt(lit(1))
2788 .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
2789 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
2790 let predicate_expr =
2791 test_build_predicate_expression(&expr, &schema, &mut required_columns);
2792 assert_eq!(predicate_expr.to_string(), expected_expr);
2793 println!("required_columns: {required_columns:#?}"); let c1_min_field = Field::new("c1_min", DataType::Int32, false);
2796 assert_eq!(
2797 required_columns.columns[0],
2798 (
2799 phys_expr::Column::new("c1", 0),
2800 StatisticsType::Min,
2801 c1_min_field.with_nullable(true) )
2803 );
2804 let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
2806 assert_eq!(
2807 required_columns.columns[1],
2808 (
2809 phys_expr::Column::new("c1", 0),
2810 StatisticsType::NullCount,
2811 c1_null_count_field.with_nullable(true) )
2813 );
2814 let row_count_field = Field::new("row_count", DataType::UInt64, false);
2816 assert_eq!(
2817 required_columns.columns[2],
2818 (
2819 phys_expr::Column::new("c1", 0),
2820 StatisticsType::RowCount,
2821 row_count_field.with_nullable(true) )
2823 );
2824 let c2_min_field = Field::new("c2_min", DataType::Int32, false);
2826 assert_eq!(
2827 required_columns.columns[3],
2828 (
2829 phys_expr::Column::new("c2", 1),
2830 StatisticsType::Min,
2831 c2_min_field.with_nullable(true) )
2833 );
2834 let c2_max_field = Field::new("c2_max", DataType::Int32, false);
2835 assert_eq!(
2836 required_columns.columns[4],
2837 (
2838 phys_expr::Column::new("c2", 1),
2839 StatisticsType::Max,
2840 c2_max_field.with_nullable(true) )
2842 );
2843 let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
2845 assert_eq!(
2846 required_columns.columns[5],
2847 (
2848 phys_expr::Column::new("c2", 1),
2849 StatisticsType::NullCount,
2850 c2_null_count_field.with_nullable(true) )
2852 );
2853 let row_count_field = Field::new("row_count", DataType::UInt64, false);
2855 assert_eq!(
2856 required_columns.columns[2],
2857 (
2858 phys_expr::Column::new("c1", 0),
2859 StatisticsType::RowCount,
2860 row_count_field.with_nullable(true) )
2862 );
2863 assert_eq!(required_columns.columns.len(), 6);
2865
2866 Ok(())
2867 }
2868
2869 #[test]
2870 fn row_group_predicate_in_list() -> Result<()> {
2871 let schema = Schema::new(vec![
2872 Field::new("c1", DataType::Int32, false),
2873 Field::new("c2", DataType::Int32, false),
2874 ]);
2875 let expr = Expr::InList(InList::new(
2877 Box::new(col("c1")),
2878 vec![lit(1), lit(2), lit(3)],
2879 false,
2880 ));
2881 let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
2882 let predicate_expr =
2883 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2884 assert_eq!(predicate_expr.to_string(), expected_expr);
2885
2886 Ok(())
2887 }
2888
2889 #[test]
2890 fn row_group_predicate_in_list_empty() -> Result<()> {
2891 let schema = Schema::new(vec![
2892 Field::new("c1", DataType::Int32, false),
2893 Field::new("c2", DataType::Int32, false),
2894 ]);
2895 let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
2897 let expected_expr = "true";
2898 let predicate_expr =
2899 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2900 assert_eq!(predicate_expr.to_string(), expected_expr);
2901
2902 Ok(())
2903 }
2904
2905 #[test]
2906 fn row_group_predicate_in_list_negated() -> Result<()> {
2907 let schema = Schema::new(vec![
2908 Field::new("c1", DataType::Int32, false),
2909 Field::new("c2", DataType::Int32, false),
2910 ]);
2911 let expr = Expr::InList(InList::new(
2913 Box::new(col("c1")),
2914 vec![lit(1), lit(2), lit(3)],
2915 true,
2916 ));
2917 let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
2918 let predicate_expr =
2919 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2920 assert_eq!(predicate_expr.to_string(), expected_expr);
2921
2922 Ok(())
2923 }
2924
2925 #[test]
2926 fn row_group_predicate_between() -> Result<()> {
2927 let schema = Schema::new(vec![
2928 Field::new("c1", DataType::Int32, false),
2929 Field::new("c2", DataType::Int32, false),
2930 ]);
2931
2932 let expr1 = col("c1").between(lit(1), lit(5));
2934
2935 let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
2937
2938 let predicate_expr1 =
2939 test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
2940
2941 let predicate_expr2 =
2942 test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
2943 assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
2944
2945 Ok(())
2946 }
2947
2948 #[test]
2949 fn row_group_predicate_between_with_in_list() -> Result<()> {
2950 let schema = Schema::new(vec![
2951 Field::new("c1", DataType::Int32, false),
2952 Field::new("c2", DataType::Int32, false),
2953 ]);
2954 let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
2956
2957 let expr2 = col("c2").between(lit(4), lit(5));
2959
2960 let expr3 = expr1.and(expr2);
2962
2963 let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
2964 let predicate_expr =
2965 test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
2966 assert_eq!(predicate_expr.to_string(), expected_expr);
2967
2968 Ok(())
2969 }
2970
2971 #[test]
2972 fn row_group_predicate_in_list_to_many_values() -> Result<()> {
2973 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2974 let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
2978
2979 let expected_expr = "true";
2980 let predicate_expr =
2981 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2982 assert_eq!(predicate_expr.to_string(), expected_expr);
2983
2984 Ok(())
2985 }
2986
2987 #[test]
2988 fn row_group_predicate_cast_int_int() -> Result<()> {
2989 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2990 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
2991
2992 let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
2995 let predicate_expr =
2996 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2997 assert_eq!(predicate_expr.to_string(), expected_expr);
2998
2999 let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
3001 let predicate_expr =
3002 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3003 assert_eq!(predicate_expr.to_string(), expected_expr);
3004
3005 let expected_expr =
3006 "c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
3007
3008 let expr =
3010 try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
3011 let predicate_expr =
3012 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3013 assert_eq!(predicate_expr.to_string(), expected_expr);
3014
3015 let expr =
3017 lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
3018 let predicate_expr =
3019 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3020 assert_eq!(predicate_expr.to_string(), expected_expr);
3021
3022 Ok(())
3023 }
3024
3025 #[test]
3026 fn row_group_predicate_cast_string_string() -> Result<()> {
3027 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3028 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Utf8) <= 1 AND 1 <= CAST(c1_max@1 AS Utf8)";
3029
3030 let expr = cast(col("c1"), DataType::Utf8)
3032 .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3033 let predicate_expr =
3034 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3035 assert_eq!(predicate_expr.to_string(), expected_expr);
3036
3037 let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3039 .eq(cast(col("c1"), DataType::Utf8));
3040 let predicate_expr =
3041 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3042 assert_eq!(predicate_expr.to_string(), expected_expr);
3043
3044 Ok(())
3045 }
3046
3047 #[test]
3048 fn row_group_predicate_cast_string_int() -> Result<()> {
3049 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3050 let expected_expr = "true";
3051
3052 let expr = cast(col("c1"), DataType::Int32).eq(lit(ScalarValue::Int32(Some(1))));
3054 let predicate_expr =
3055 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3056 assert_eq!(predicate_expr.to_string(), expected_expr);
3057
3058 let expr = lit(ScalarValue::Int32(Some(1))).eq(cast(col("c1"), DataType::Int32));
3060 let predicate_expr =
3061 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3062 assert_eq!(predicate_expr.to_string(), expected_expr);
3063
3064 Ok(())
3065 }
3066
3067 #[test]
3068 fn row_group_predicate_cast_int_string() -> Result<()> {
3069 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3070 let expected_expr = "true";
3071
3072 let expr = cast(col("c1"), DataType::Utf8)
3074 .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3075 let predicate_expr =
3076 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3077 assert_eq!(predicate_expr.to_string(), expected_expr);
3078
3079 let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3081 .eq(cast(col("c1"), DataType::Utf8));
3082 let predicate_expr =
3083 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3084 assert_eq!(predicate_expr.to_string(), expected_expr);
3085
3086 Ok(())
3087 }
3088
3089 #[test]
3090 fn row_group_predicate_date_date() -> Result<()> {
3091 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3092 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Date64) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Date64)";
3093
3094 let expr =
3096 cast(col("c1"), DataType::Date64).eq(lit(ScalarValue::Date64(Some(123))));
3097 let predicate_expr =
3098 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3099 assert_eq!(predicate_expr.to_string(), expected_expr);
3100
3101 let expr =
3103 lit(ScalarValue::Date64(Some(123))).eq(cast(col("c1"), DataType::Date64));
3104 let predicate_expr =
3105 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3106 assert_eq!(predicate_expr.to_string(), expected_expr);
3107
3108 Ok(())
3109 }
3110
3111 #[test]
3112 fn row_group_predicate_dict_string_date() -> Result<()> {
3113 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3115 let expected_expr = "true";
3116
3117 let expr = cast(
3119 col("c1"),
3120 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3121 )
3122 .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3123 let predicate_expr =
3124 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3125 assert_eq!(predicate_expr.to_string(), expected_expr);
3126
3127 let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))).eq(cast(
3129 col("c1"),
3130 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3131 ));
3132 let predicate_expr =
3133 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3134 assert_eq!(predicate_expr.to_string(), expected_expr);
3135
3136 Ok(())
3137 }
3138
3139 #[test]
3140 fn row_group_predicate_date_dict_string() -> Result<()> {
3141 let schema = Schema::new(vec![Field::new(
3143 "c1",
3144 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3145 false,
3146 )]);
3147 let expected_expr = "true";
3148
3149 let expr =
3151 cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3152 let predicate_expr =
3153 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3154 assert_eq!(predicate_expr.to_string(), expected_expr);
3155
3156 let expr =
3158 lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3159 let predicate_expr =
3160 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3161 assert_eq!(predicate_expr.to_string(), expected_expr);
3162
3163 Ok(())
3164 }
3165
3166 #[test]
3167 fn row_group_predicate_dict_dict_same_value_type() -> Result<()> {
3168 let schema = Schema::new(vec![Field::new(
3170 "c1",
3171 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3172 false,
3173 )]);
3174
3175 let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3177 let predicate_expr =
3178 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3179 let expected_expr =
3180 "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3181 assert_eq!(predicate_expr.to_string(), expected_expr);
3182
3183 let expr = cast(
3185 col("c1"),
3186 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3187 )
3188 .eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3189 let predicate_expr =
3190 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3191 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Utf8)) <= test AND test <= CAST(c1_max@1 AS Dictionary(UInt16, Utf8))";
3192 assert_eq!(predicate_expr.to_string(), expected_expr);
3193
3194 Ok(())
3195 }
3196
3197 #[test]
3198 fn row_group_predicate_dict_dict_different_value_type() -> Result<()> {
3199 let schema = Schema::new(vec![Field::new(
3201 "c1",
3202 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Int32)),
3203 false,
3204 )]);
3205 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 123 AND 123 <= CAST(c1_max@1 AS Int64)";
3206
3207 let expr =
3209 cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(123))));
3210 let predicate_expr =
3211 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3212 assert_eq!(predicate_expr.to_string(), expected_expr);
3213
3214 Ok(())
3215 }
3216
3217 #[test]
3218 fn row_group_predicate_nested_dict() -> Result<()> {
3219 let schema = Schema::new(vec![Field::new(
3221 "c1",
3222 DataType::Dictionary(
3223 Box::new(DataType::UInt8),
3224 Box::new(DataType::Dictionary(
3225 Box::new(DataType::UInt16),
3226 Box::new(DataType::Utf8),
3227 )),
3228 ),
3229 false,
3230 )]);
3231 let expected_expr =
3232 "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3233
3234 let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3236 let predicate_expr =
3237 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3238 assert_eq!(predicate_expr.to_string(), expected_expr);
3239
3240 Ok(())
3241 }
3242
3243 #[test]
3244 fn row_group_predicate_dict_date_dict_date() -> Result<()> {
3245 let schema = Schema::new(vec![Field::new(
3247 "c1",
3248 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Date32)),
3249 false,
3250 )]);
3251 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Date64)) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Dictionary(UInt16, Date64))";
3252
3253 let expr = cast(
3255 col("c1"),
3256 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Date64)),
3257 )
3258 .eq(lit(ScalarValue::Date64(Some(123))));
3259 let predicate_expr =
3260 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3261 assert_eq!(predicate_expr.to_string(), expected_expr);
3262
3263 Ok(())
3264 }
3265
3266 #[test]
3267 fn row_group_predicate_date_string() -> Result<()> {
3268 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]);
3269 let expected_expr = "true";
3270
3271 let expr =
3273 cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3274 let predicate_expr =
3275 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3276 assert_eq!(predicate_expr.to_string(), expected_expr);
3277
3278 let expr =
3280 lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3281 let predicate_expr =
3282 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3283 assert_eq!(predicate_expr.to_string(), expected_expr);
3284
3285 Ok(())
3286 }
3287
3288 #[test]
3289 fn row_group_predicate_string_date() -> Result<()> {
3290 let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3291 let expected_expr = "true";
3292
3293 let expr = cast(col("c1"), DataType::Utf8)
3295 .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3296 let predicate_expr =
3297 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3298 assert_eq!(predicate_expr.to_string(), expected_expr);
3299
3300 let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string())))
3302 .eq(cast(col("c1"), DataType::Utf8));
3303 let predicate_expr =
3304 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3305 assert_eq!(predicate_expr.to_string(), expected_expr);
3306
3307 Ok(())
3308 }
3309
3310 #[test]
3311 fn row_group_predicate_cast_list() -> Result<()> {
3312 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3313 let expr = Expr::InList(InList::new(
3315 Box::new(cast(col("c1"), DataType::Int64)),
3316 vec![
3317 lit(ScalarValue::Int64(Some(1))),
3318 lit(ScalarValue::Int64(Some(2))),
3319 lit(ScalarValue::Int64(Some(3))),
3320 ],
3321 false,
3322 ));
3323 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
3324 let predicate_expr =
3325 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3326 assert_eq!(predicate_expr.to_string(), expected_expr);
3327
3328 let expr = Expr::InList(InList::new(
3329 Box::new(cast(col("c1"), DataType::Int64)),
3330 vec![
3331 lit(ScalarValue::Int64(Some(1))),
3332 lit(ScalarValue::Int64(Some(2))),
3333 lit(ScalarValue::Int64(Some(3))),
3334 ],
3335 true,
3336 ));
3337 let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
3338 let predicate_expr =
3339 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3340 assert_eq!(predicate_expr.to_string(), expected_expr);
3341
3342 Ok(())
3343 }
3344
3345 #[test]
3346 fn prune_decimal_data() {
3347 let schema = Arc::new(Schema::new(vec![Field::new(
3349 "s1",
3350 DataType::Decimal128(9, 2),
3351 true,
3352 )]));
3353
3354 prune_with_expr(
3355 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
3357 &schema,
3358 &TestStatistics::new().with(
3361 "s1",
3362 ContainerStats::new_i32(
3363 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3366 ),
3367 &[false, true, false, true],
3368 );
3369
3370 prune_with_expr(
3371 cast(col("s1"), DataType::Decimal128(14, 3))
3373 .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3374 &schema,
3375 &TestStatistics::new().with(
3376 "s1",
3377 ContainerStats::new_i32(
3378 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3381 ),
3382 &[false, true, false, true],
3383 );
3384
3385 prune_with_expr(
3386 try_cast(col("s1"), DataType::Decimal128(14, 3))
3388 .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3389 &schema,
3390 &TestStatistics::new().with(
3391 "s1",
3392 ContainerStats::new_i32(
3393 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3396 ),
3397 &[false, true, false, true],
3398 );
3399
3400 let schema = Arc::new(Schema::new(vec![Field::new(
3402 "s1",
3403 DataType::Decimal128(18, 2),
3404 true,
3405 )]));
3406 prune_with_expr(
3407 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
3409 &schema,
3410 &TestStatistics::new().with(
3413 "s1",
3414 ContainerStats::new_i64(
3415 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), Some(4), None], ),
3418 ),
3419 &[false, true, false, true],
3420 );
3421
3422 let schema = Arc::new(Schema::new(vec![Field::new(
3424 "s1",
3425 DataType::Decimal128(23, 2),
3426 true,
3427 )]));
3428
3429 prune_with_expr(
3430 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
3432 &schema,
3433 &TestStatistics::new().with(
3434 "s1",
3435 ContainerStats::new_decimal128(
3436 vec![Some(0), Some(400), None, Some(300)], vec![Some(500), Some(600), Some(400), None], 23,
3439 2,
3440 ),
3441 ),
3442 &[false, true, false, true],
3443 );
3444 }
3445
3446 #[test]
3447 fn prune_api() {
3448 let schema = Arc::new(Schema::new(vec![
3449 Field::new("s1", DataType::Utf8, true),
3450 Field::new("s2", DataType::Int32, true),
3451 ]));
3452
3453 let statistics = TestStatistics::new().with(
3454 "s2",
3455 ContainerStats::new_i32(
3456 vec![Some(0), Some(4), None, Some(3)], vec![Some(5), Some(6), None, None], ),
3459 );
3460 prune_with_expr(
3461 col("s2").gt(lit(5)),
3463 &schema,
3464 &statistics,
3465 &[false, true, true, true],
3470 );
3471
3472 prune_with_expr(
3473 cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
3475 &schema,
3476 &statistics,
3477 &[false, true, true, true],
3478 );
3479 }
3480
3481 #[test]
3482 fn prune_not_eq_data() {
3483 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3484
3485 prune_with_expr(
3486 col("s1").not_eq(lit("M")),
3488 &schema,
3489 &TestStatistics::new().with(
3490 "s1",
3491 ContainerStats::new_utf8(
3492 vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], ),
3495 ),
3496 &[true, true, true, false, true, true],
3503 );
3504 }
3505
3506 fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
3522 let schema =
3523 Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
3524
3525 let statistics = TestStatistics::new().with(
3526 "b1",
3527 ContainerStats::new_bool(
3528 vec![Some(false), Some(false), Some(true), None, Some(false)], vec![Some(false), Some(true), Some(true), None, None], ),
3531 );
3532 let expected_true = vec![false, true, true, true, true];
3533 let expected_false = vec![true, true, false, true, true];
3534
3535 (schema, statistics, expected_true, expected_false)
3536 }
3537
3538 #[test]
3539 fn prune_bool_const_expr() {
3540 let (schema, statistics, _, _) = bool_setup();
3541
3542 prune_with_expr(
3543 lit(true),
3545 &schema,
3546 &statistics,
3547 &[true, true, true, true, true],
3548 );
3549
3550 prune_with_expr(
3551 lit(false),
3553 &schema,
3554 &statistics,
3555 &[false, false, false, false, false],
3556 );
3557 }
3558
3559 #[test]
3560 fn prune_bool_column() {
3561 let (schema, statistics, expected_true, _) = bool_setup();
3562
3563 prune_with_expr(
3564 col("b1"),
3566 &schema,
3567 &statistics,
3568 &expected_true,
3569 );
3570 }
3571
3572 #[test]
3573 fn prune_bool_not_column() {
3574 let (schema, statistics, _, expected_false) = bool_setup();
3575
3576 prune_with_expr(
3577 col("b1").not(),
3579 &schema,
3580 &statistics,
3581 &expected_false,
3582 );
3583 }
3584
3585 #[test]
3586 fn prune_bool_column_eq_true() {
3587 let (schema, statistics, expected_true, _) = bool_setup();
3588
3589 prune_with_expr(
3590 col("b1").eq(lit(true)),
3592 &schema,
3593 &statistics,
3594 &expected_true,
3595 );
3596 }
3597
3598 #[test]
3599 fn prune_bool_not_column_eq_true() {
3600 let (schema, statistics, _, expected_false) = bool_setup();
3601
3602 prune_with_expr(
3603 col("b1").not().eq(lit(true)),
3605 &schema,
3606 &statistics,
3607 &expected_false,
3608 );
3609 }
3610
3611 fn int32_setup() -> (SchemaRef, TestStatistics) {
3621 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
3622
3623 let statistics = TestStatistics::new().with(
3624 "i",
3625 ContainerStats::new_i32(
3626 vec![Some(-5), Some(1), Some(-11), None, Some(1)], vec![Some(5), Some(11), Some(-1), None, None], ),
3629 );
3630 (schema, statistics)
3631 }
3632
3633 #[test]
3634 fn prune_int32_col_gt_zero() {
3635 let (schema, statistics) = int32_setup();
3636
3637 let expected_ret = &[true, true, false, true, true];
3644
3645 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
3647
3648 prune_with_expr(
3650 Expr::Negative(Box::new(col("i"))).lt(lit(0)),
3651 &schema,
3652 &statistics,
3653 expected_ret,
3654 );
3655 }
3656
3657 #[test]
3658 fn prune_int32_col_lte_zero() {
3659 let (schema, statistics) = int32_setup();
3660
3661 let expected_ret = &[true, false, true, true, false];
3668
3669 prune_with_expr(
3670 col("i").lt_eq(lit(0)),
3672 &schema,
3673 &statistics,
3674 expected_ret,
3675 );
3676
3677 prune_with_expr(
3678 Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
3680 &schema,
3681 &statistics,
3682 expected_ret,
3683 );
3684 }
3685
3686 #[test]
3687 fn prune_int32_col_lte_zero_cast() {
3688 let (schema, statistics) = int32_setup();
3689
3690 let expected_ret = &[true, true, true, true, true];
3697
3698 prune_with_expr(
3699 cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3701 &schema,
3702 &statistics,
3703 expected_ret,
3704 );
3705
3706 prune_with_expr(
3707 try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3709 &schema,
3710 &statistics,
3711 expected_ret,
3712 );
3713
3714 prune_with_expr(
3715 cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3717 &schema,
3718 &statistics,
3719 expected_ret,
3720 );
3721
3722 prune_with_expr(
3723 try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3725 &schema,
3726 &statistics,
3727 expected_ret,
3728 );
3729 }
3730
3731 #[test]
3732 fn prune_int32_col_eq_zero() {
3733 let (schema, statistics) = int32_setup();
3734
3735 let expected_ret = &[true, false, false, true, false];
3742
3743 prune_with_expr(
3744 col("i").eq(lit(0)),
3746 &schema,
3747 &statistics,
3748 expected_ret,
3749 );
3750 }
3751
3752 #[test]
3753 fn prune_int32_col_eq_zero_cast() {
3754 let (schema, statistics) = int32_setup();
3755
3756 let expected_ret = &[true, false, false, true, false];
3763
3764 prune_with_expr(
3765 cast(col("i"), DataType::Int64).eq(lit(0i64)),
3766 &schema,
3767 &statistics,
3768 expected_ret,
3769 );
3770
3771 prune_with_expr(
3772 try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
3773 &schema,
3774 &statistics,
3775 expected_ret,
3776 );
3777 }
3778
3779 #[test]
3780 fn prune_int32_col_eq_zero_cast_as_str() {
3781 let (schema, statistics) = int32_setup();
3782
3783 let expected_ret = &[true, true, true, true, true];
3793
3794 prune_with_expr(
3795 cast(col("i"), DataType::Utf8).eq(lit("0")),
3796 &schema,
3797 &statistics,
3798 expected_ret,
3799 );
3800 }
3801
3802 #[test]
3803 fn prune_int32_col_lt_neg_one() {
3804 let (schema, statistics) = int32_setup();
3805
3806 let expected_ret = &[true, true, false, true, true];
3813
3814 prune_with_expr(
3815 col("i").gt(lit(-1)),
3817 &schema,
3818 &statistics,
3819 expected_ret,
3820 );
3821
3822 prune_with_expr(
3823 Expr::Negative(Box::new(col("i"))).lt(lit(1)),
3825 &schema,
3826 &statistics,
3827 expected_ret,
3828 );
3829 }
3830
3831 #[test]
3832 fn prune_int32_is_null() {
3833 let (schema, statistics) = int32_setup();
3834
3835 let expected_ret = &[true, true, true, true, true];
3838
3839 prune_with_expr(
3840 col("i").is_null(),
3842 &schema,
3843 &statistics,
3844 expected_ret,
3845 );
3846
3847 let statistics = statistics.with_null_counts(
3849 "i",
3850 vec![
3851 Some(0), Some(1), None, None, Some(0), ],
3857 );
3858
3859 let expected_ret = &[false, true, true, true, false];
3860
3861 prune_with_expr(
3862 col("i").is_null(),
3864 &schema,
3865 &statistics,
3866 expected_ret,
3867 );
3868 }
3869
3870 #[test]
3871 fn prune_int32_column_is_known_all_null() {
3872 let (schema, statistics) = int32_setup();
3873
3874 let expected_ret = &[true, false, true, true, false];
3881
3882 prune_with_expr(
3883 col("i").lt(lit(0)),
3885 &schema,
3886 &statistics,
3887 expected_ret,
3888 );
3889
3890 let statistics = statistics.with_row_counts(
3892 "i",
3893 vec![
3894 Some(10), Some(9), None, Some(4),
3898 Some(10),
3899 ],
3900 );
3901
3902 prune_with_expr(
3904 col("i").lt(lit(0)),
3906 &schema,
3907 &statistics,
3908 expected_ret,
3909 );
3910
3911 let statistics = statistics.with_null_counts(
3913 "i",
3914 vec![
3915 Some(0), Some(1), None, Some(4), Some(0), ],
3921 );
3922
3923 let expected_ret = &[true, false, true, false, false];
3932
3933 prune_with_expr(
3934 col("i").lt(lit(0)),
3936 &schema,
3937 &statistics,
3938 expected_ret,
3939 );
3940 }
3941
3942 #[test]
3943 fn prune_cast_column_scalar() {
3944 let (schema, statistics) = int32_setup();
3946 let expected_ret = &[true, true, false, true, true];
3947
3948 prune_with_expr(
3949 col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
3951 &schema,
3952 &statistics,
3953 expected_ret,
3954 );
3955
3956 prune_with_expr(
3957 cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3959 &schema,
3960 &statistics,
3961 expected_ret,
3962 );
3963
3964 prune_with_expr(
3965 try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3967 &schema,
3968 &statistics,
3969 expected_ret,
3970 );
3971
3972 prune_with_expr(
3973 Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
3975 .lt(lit(ScalarValue::Int64(Some(0)))),
3976 &schema,
3977 &statistics,
3978 expected_ret,
3979 );
3980 }
3981
3982 #[test]
3983 fn test_increment_utf8() {
3984 assert_eq!(increment_utf8("abc").unwrap(), "abd");
3986 assert_eq!(increment_utf8("abz").unwrap(), "ab{");
3987
3988 assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}"); assert_eq!(increment_utf8("ß").unwrap(), "à"); assert_eq!(increment_utf8("℣").unwrap(), "ℤ"); assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}"); assert!(increment_utf8("").is_none());
4004 assert!(increment_utf8("\u{10FFFF}").is_none()); assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4008
4009 assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
4011 assert!(increment_utf8("\u{D7FF}").is_none());
4012
4013 assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
4015 assert!(increment_utf8("\u{FDCF}").is_none());
4016
4017 assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4019 assert!(increment_utf8("\u{10FFFF}").is_none()); }
4021
4022 fn utf8_setup() -> (SchemaRef, TestStatistics) {
4035 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4036
4037 let statistics = TestStatistics::new().with(
4038 "s1",
4039 ContainerStats::new_utf8(
4040 vec![
4041 Some("A"),
4042 Some("A"),
4043 Some("N"),
4044 Some("M"),
4045 None,
4046 Some("A"),
4047 Some(""),
4048 Some(""),
4049 Some("AB"),
4050 Some("A\u{10ffff}\u{10ffff}"),
4051 ], vec![
4053 Some("Z"),
4054 Some("L"),
4055 Some("Z"),
4056 Some("M"),
4057 None,
4058 None,
4059 Some("A"),
4060 Some(""),
4061 Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
4062 Some("A\u{10ffff}\u{10ffff}"),
4063 ], ),
4065 );
4066 (schema, statistics)
4067 }
4068
4069 #[test]
4070 fn prune_utf8_eq() {
4071 let (schema, statistics) = utf8_setup();
4072
4073 let expr = col("s1").eq(lit("A"));
4074 #[rustfmt::skip]
4075 let expected_ret = &[
4076 true,
4078 true,
4080 false,
4082 false,
4084 true,
4086 true,
4088 true,
4090 false,
4092 false,
4094 false,
4096 ];
4097 prune_with_expr(expr, &schema, &statistics, expected_ret);
4098
4099 let expr = col("s1").eq(lit(""));
4100 #[rustfmt::skip]
4101 let expected_ret = &[
4102 false,
4104 false,
4106 false,
4108 false,
4110 true,
4112 false,
4114 true,
4116 true,
4118 false,
4120 false,
4122 ];
4123 prune_with_expr(expr, &schema, &statistics, expected_ret);
4124 }
4125
4126 #[test]
4127 fn prune_utf8_not_eq() {
4128 let (schema, statistics) = utf8_setup();
4129
4130 let expr = col("s1").not_eq(lit("A"));
4131 #[rustfmt::skip]
4132 let expected_ret = &[
4133 true,
4135 true,
4137 true,
4139 true,
4141 true,
4143 true,
4145 true,
4147 true,
4149 true,
4151 true,
4153 ];
4154 prune_with_expr(expr, &schema, &statistics, expected_ret);
4155
4156 let expr = col("s1").not_eq(lit(""));
4157 #[rustfmt::skip]
4158 let expected_ret = &[
4159 true,
4161 true,
4163 true,
4165 true,
4167 true,
4169 true,
4171 true,
4173 false,
4175 true,
4177 true,
4179 ];
4180 prune_with_expr(expr, &schema, &statistics, expected_ret);
4181 }
4182
4183 #[test]
4184 fn prune_utf8_like_one() {
4185 let (schema, statistics) = utf8_setup();
4186
4187 let expr = col("s1").like(lit("A_"));
4188 #[rustfmt::skip]
4189 let expected_ret = &[
4190 true,
4192 true,
4194 false,
4196 false,
4198 true,
4200 true,
4202 true,
4204 false,
4206 true,
4208 true,
4210 ];
4211 prune_with_expr(expr, &schema, &statistics, expected_ret);
4212
4213 let expr = col("s1").like(lit("_A_"));
4214 #[rustfmt::skip]
4215 let expected_ret = &[
4216 true,
4218 true,
4220 true,
4222 true,
4224 true,
4226 true,
4228 true,
4230 true,
4232 true,
4234 true,
4236 ];
4237 prune_with_expr(expr, &schema, &statistics, expected_ret);
4238
4239 let expr = col("s1").like(lit("_"));
4240 #[rustfmt::skip]
4241 let expected_ret = &[
4242 true,
4244 true,
4246 true,
4248 true,
4250 true,
4252 true,
4254 true,
4256 true,
4258 true,
4260 true,
4262 ];
4263 prune_with_expr(expr, &schema, &statistics, expected_ret);
4264
4265 let expr = col("s1").like(lit(""));
4266 #[rustfmt::skip]
4267 let expected_ret = &[
4268 false,
4270 false,
4272 false,
4274 false,
4276 true,
4278 false,
4280 true,
4282 true,
4284 false,
4286 false,
4288 ];
4289 prune_with_expr(expr, &schema, &statistics, expected_ret);
4290 }
4291
4292 #[test]
4293 fn prune_utf8_like_many() {
4294 let (schema, statistics) = utf8_setup();
4295
4296 let expr = col("s1").like(lit("A%"));
4297 #[rustfmt::skip]
4298 let expected_ret = &[
4299 true,
4301 true,
4303 false,
4305 false,
4307 true,
4309 true,
4311 true,
4313 false,
4315 true,
4317 true,
4319 ];
4320 prune_with_expr(expr, &schema, &statistics, expected_ret);
4321
4322 let expr = col("s1").like(lit("%A%"));
4323 #[rustfmt::skip]
4324 let expected_ret = &[
4325 true,
4327 true,
4329 true,
4331 true,
4333 true,
4335 true,
4337 true,
4339 true,
4341 true,
4343 true,
4345 ];
4346 prune_with_expr(expr, &schema, &statistics, expected_ret);
4347
4348 let expr = col("s1").like(lit("%"));
4349 #[rustfmt::skip]
4350 let expected_ret = &[
4351 true,
4353 true,
4355 true,
4357 true,
4359 true,
4361 true,
4363 true,
4365 true,
4367 true,
4369 true,
4371 ];
4372 prune_with_expr(expr, &schema, &statistics, expected_ret);
4373
4374 let expr = col("s1").like(lit(""));
4375 #[rustfmt::skip]
4376 let expected_ret = &[
4377 false,
4379 false,
4381 false,
4383 false,
4385 true,
4387 false,
4389 true,
4391 true,
4393 false,
4395 false,
4397 ];
4398 prune_with_expr(expr, &schema, &statistics, expected_ret);
4399 }
4400
4401 #[test]
4402 fn prune_utf8_not_like_one() {
4403 let (schema, statistics) = utf8_setup();
4404
4405 let expr = col("s1").not_like(lit("A\u{10ffff}_"));
4406 #[rustfmt::skip]
4407 let expected_ret = &[
4408 true,
4410 true,
4412 true,
4414 true,
4416 true,
4418 true,
4420 true,
4422 true,
4424 true,
4426 true,
4429 ];
4430 prune_with_expr(expr, &schema, &statistics, expected_ret);
4431 }
4432
4433 #[test]
4434 fn prune_utf8_not_like_many() {
4435 let (schema, statistics) = utf8_setup();
4436
4437 let expr = col("s1").not_like(lit("A\u{10ffff}%"));
4438 #[rustfmt::skip]
4439 let expected_ret = &[
4440 true,
4442 true,
4444 true,
4446 true,
4448 true,
4450 true,
4452 true,
4454 true,
4456 true,
4458 false,
4460 ];
4461 prune_with_expr(expr, &schema, &statistics, expected_ret);
4462
4463 let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}"));
4464 #[rustfmt::skip]
4465 let expected_ret = &[
4466 true,
4468 true,
4470 true,
4472 true,
4474 true,
4476 true,
4478 true,
4480 true,
4482 true,
4484 true,
4486 ];
4487 prune_with_expr(expr, &schema, &statistics, expected_ret);
4488
4489 let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}_"));
4490 #[rustfmt::skip]
4491 let expected_ret = &[
4492 true,
4494 true,
4496 true,
4498 true,
4500 true,
4502 true,
4504 true,
4506 true,
4508 true,
4510 true,
4512 ];
4513 prune_with_expr(expr, &schema, &statistics, expected_ret);
4514
4515 let expr = col("s1").not_like(lit("A\\%%"));
4516 let statistics = TestStatistics::new().with(
4517 "s1",
4518 ContainerStats::new_utf8(
4519 vec![Some("A%a"), Some("A")],
4520 vec![Some("A%c"), Some("A")],
4521 ),
4522 );
4523 let expected_ret = &[false, true];
4524 prune_with_expr(expr, &schema, &statistics, expected_ret);
4525 }
4526
4527 #[test]
4528 fn test_rewrite_expr_to_prunable() {
4529 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4530 let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4531
4532 let left_input = col("a");
4534 let left_input = logical2physical(&left_input, &schema);
4535 let right_input = lit(ScalarValue::Int32(Some(12)));
4536 let right_input = logical2physical(&right_input, &schema);
4537 let (result_left, _, result_right) = rewrite_expr_to_prunable(
4538 &left_input,
4539 Operator::Eq,
4540 &right_input,
4541 df_schema.clone(),
4542 )
4543 .unwrap();
4544 assert_eq!(result_left.to_string(), left_input.to_string());
4545 assert_eq!(result_right.to_string(), right_input.to_string());
4546
4547 let left_input = cast(col("a"), DataType::Decimal128(20, 3));
4549 let left_input = logical2physical(&left_input, &schema);
4550 let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
4551 let right_input = logical2physical(&right_input, &schema);
4552 let (result_left, _, result_right) = rewrite_expr_to_prunable(
4553 &left_input,
4554 Operator::Gt,
4555 &right_input,
4556 df_schema.clone(),
4557 )
4558 .unwrap();
4559 assert_eq!(result_left.to_string(), left_input.to_string());
4560 assert_eq!(result_right.to_string(), right_input.to_string());
4561
4562 let left_input = try_cast(col("a"), DataType::Int64);
4564 let left_input = logical2physical(&left_input, &schema);
4565 let right_input = lit(ScalarValue::Int64(Some(12)));
4566 let right_input = logical2physical(&right_input, &schema);
4567 let (result_left, _, result_right) =
4568 rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
4569 .unwrap();
4570 assert_eq!(result_left.to_string(), left_input.to_string());
4571 assert_eq!(result_right.to_string(), right_input.to_string());
4572
4573 }
4575
4576 #[test]
4577 fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
4578 struct CustomUnhandledHook;
4579
4580 impl UnhandledPredicateHook for CustomUnhandledHook {
4581 fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
4585 Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
4586 }
4587 }
4588
4589 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4590 let schema_with_b = Schema::new(vec![
4591 Field::new("a", DataType::Int32, true),
4592 Field::new("b", DataType::Int32, true),
4593 ]);
4594
4595 let rewriter = PredicateRewriter::new()
4596 .with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
4597
4598 let transform_expr = |expr| {
4599 let expr = logical2physical(&expr, &schema_with_b);
4600 rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
4601 };
4602
4603 let known_expression = col("a").eq(lit(12));
4605 let known_expression_transformed = PredicateRewriter::new()
4606 .rewrite_predicate_to_statistics_predicate(
4607 &logical2physical(&known_expression, &schema),
4608 &schema,
4609 );
4610
4611 let input = col("b").eq(lit(12));
4613 let expected = logical2physical(&lit(42), &schema);
4614 let transformed = transform_expr(input.clone());
4615 assert_eq!(transformed.to_string(), expected.to_string());
4616
4617 let input = known_expression.clone().and(input.clone());
4619 let expected = phys_expr::BinaryExpr::new(
4620 Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4621 Operator::And,
4622 logical2physical(&lit(42), &schema),
4623 );
4624 let transformed = transform_expr(input.clone());
4625 assert_eq!(transformed.to_string(), expected.to_string());
4626
4627 let input = array_has(make_array(vec![lit(1)]), col("a"));
4629 let expected = logical2physical(&lit(42), &schema);
4630 let transformed = transform_expr(input.clone());
4631 assert_eq!(transformed.to_string(), expected.to_string());
4632
4633 let input = known_expression.and(input);
4635 let expected = phys_expr::BinaryExpr::new(
4636 Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4637 Operator::And,
4638 logical2physical(&lit(42), &schema),
4639 );
4640 let transformed = transform_expr(input.clone());
4641 assert_eq!(transformed.to_string(), expected.to_string());
4642 }
4643
4644 #[test]
4645 fn test_rewrite_expr_to_prunable_error() {
4646 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
4649 let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4650 let left_input = cast(col("a"), DataType::Int64);
4651 let left_input = logical2physical(&left_input, &schema);
4652 let right_input = lit(ScalarValue::Int64(Some(12)));
4653 let right_input = logical2physical(&right_input, &schema);
4654 let result = rewrite_expr_to_prunable(
4655 &left_input,
4656 Operator::Gt,
4657 &right_input,
4658 df_schema.clone(),
4659 );
4660 assert!(result.is_err());
4661
4662 let left_input = is_null(col("a"));
4664 let left_input = logical2physical(&left_input, &schema);
4665 let right_input = lit(ScalarValue::Int64(Some(12)));
4666 let right_input = logical2physical(&right_input, &schema);
4667 let result =
4668 rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
4669 assert!(result.is_err());
4670 }
4672
4673 #[test]
4674 fn prune_with_contained_one_column() {
4675 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4676
4677 let statistics = TestStatistics::new()
4679 .with_contained(
4680 "s1",
4681 [ScalarValue::from("foo")],
4682 [
4683 Some(true),
4685 Some(false),
4687 None,
4689 Some(true),
4691 Some(false),
4693 None,
4695 Some(true),
4697 Some(false),
4699 None,
4701 ],
4702 )
4703 .with_contained(
4704 "s1",
4705 [ScalarValue::from("bar")],
4706 [
4707 Some(true),
4709 Some(true),
4710 Some(true),
4711 Some(false),
4713 Some(false),
4714 Some(false),
4715 None,
4717 None,
4718 None,
4719 ],
4720 )
4721 .with_contained(
4722 "s1",
4725 [ScalarValue::from("foo"), ScalarValue::from("bar")],
4726 [
4727 None,
4729 None,
4730 None,
4731 Some(true),
4733 Some(true),
4734 Some(true),
4735 Some(false),
4737 Some(false),
4738 Some(false),
4739 ],
4740 );
4741
4742 prune_with_expr(
4744 col("s1").eq(lit("foo")),
4745 &schema,
4746 &statistics,
4747 &[true, false, true, true, false, true, true, false, true],
4749 );
4750
4751 prune_with_expr(
4753 col("s1").eq(lit("bar")),
4754 &schema,
4755 &statistics,
4756 &[true, true, true, false, false, false, true, true, true],
4758 );
4759
4760 prune_with_expr(
4762 col("s1").eq(lit("baz")),
4763 &schema,
4764 &statistics,
4765 &[true, true, true, true, true, true, true, true, true],
4767 );
4768
4769 prune_with_expr(
4771 col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
4772 &schema,
4773 &statistics,
4774 &[true, true, true, true, true, true, true, true, true],
4778 );
4779
4780 prune_with_expr(
4782 col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
4783 &schema,
4784 &statistics,
4785 &[true, true, true, true, true, true, false, false, false],
4787 );
4788
4789 prune_with_expr(
4791 col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
4792 &schema,
4793 &statistics,
4794 &[true, true, true, true, true, true, true, true, true],
4796 );
4797
4798 prune_with_expr(
4800 col("s1")
4801 .eq(lit("foo"))
4802 .or(col("s1").eq(lit("bar")))
4803 .or(col("s1").eq(lit("baz"))),
4804 &schema,
4805 &statistics,
4806 &[true, true, true, true, true, true, true, true, true],
4809 );
4810
4811 prune_with_expr(
4813 col("s1").not_eq(lit("foo")),
4814 &schema,
4815 &statistics,
4816 &[false, true, true, false, true, true, false, true, true],
4818 );
4819
4820 prune_with_expr(
4822 col("s1").not_eq(lit("bar")),
4823 &schema,
4824 &statistics,
4825 &[false, false, false, true, true, true, true, true, true],
4827 );
4828
4829 prune_with_expr(
4831 col("s1")
4832 .not_eq(lit("foo"))
4833 .and(col("s1").not_eq(lit("bar"))),
4834 &schema,
4835 &statistics,
4836 &[true, true, true, false, false, false, true, true, true],
4838 );
4839
4840 prune_with_expr(
4842 col("s1")
4843 .not_eq(lit("foo"))
4844 .and(col("s1").not_eq(lit("bar")))
4845 .and(col("s1").not_eq(lit("baz"))),
4846 &schema,
4847 &statistics,
4848 &[true, true, true, true, true, true, true, true, true],
4850 );
4851
4852 prune_with_expr(
4854 col("s1")
4855 .not_eq(lit("foo"))
4856 .or(col("s1").not_eq(lit("bar"))),
4857 &schema,
4858 &statistics,
4859 &[true, true, true, true, true, true, true, true, true],
4861 );
4862
4863 prune_with_expr(
4865 col("s1")
4866 .not_eq(lit("foo"))
4867 .or(col("s1").not_eq(lit("bar")))
4868 .or(col("s1").not_eq(lit("baz"))),
4869 &schema,
4870 &statistics,
4871 &[true, true, true, true, true, true, true, true, true],
4873 );
4874 }
4875
4876 #[test]
4877 fn prune_with_contained_two_columns() {
4878 let schema = Arc::new(Schema::new(vec![
4879 Field::new("s1", DataType::Utf8, true),
4880 Field::new("s2", DataType::Utf8, true),
4881 ]));
4882
4883 let statistics = TestStatistics::new()
4885 .with_contained(
4886 "s1",
4887 [ScalarValue::from("foo")],
4888 [
4889 Some(true),
4891 Some(false),
4893 None,
4895 Some(true),
4897 Some(false),
4899 None,
4901 Some(true),
4903 Some(false),
4905 None,
4907 ],
4908 )
4909 .with_contained(
4910 "s2", [ScalarValue::from("bar")],
4912 [
4913 Some(true),
4915 Some(true),
4916 Some(true),
4917 Some(false),
4919 Some(false),
4920 Some(false),
4921 None,
4923 None,
4924 None,
4925 ],
4926 );
4927
4928 prune_with_expr(
4930 col("s1").eq(lit("foo")),
4931 &schema,
4932 &statistics,
4933 &[true, false, true, true, false, true, true, false, true],
4935 );
4936
4937 let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
4939 prune_with_expr(
4940 expr,
4941 &schema,
4942 &statistics,
4943 &[true, true, true, true, true, true, true, true, true],
4945 );
4946
4947 prune_with_expr(
4949 col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
4950 &schema,
4951 &statistics,
4952 &[false, false, false, true, false, true, true, false, true],
4956 );
4957
4958 prune_with_expr(
4960 col("s1")
4961 .not_eq(lit("foo"))
4962 .and(col("s2").not_eq(lit("bar"))),
4963 &schema,
4964 &statistics,
4965 &[false, false, false, false, true, true, false, true, true],
4969 );
4970
4971 prune_with_expr(
4973 col("s1")
4974 .not_eq(lit("foo"))
4975 .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
4976 &schema,
4977 &statistics,
4978 &[false, true, true, false, true, true, false, true, true],
4981 );
4982
4983 prune_with_expr(
4985 col("s1").like(lit("foo%bar%")),
4986 &schema,
4987 &statistics,
4988 &[true, true, true, true, true, true, true, true, true],
4990 );
4991
4992 prune_with_expr(
4994 col("s1")
4995 .like(lit("foo%bar%"))
4996 .and(col("s2").eq(lit("bar"))),
4997 &schema,
4998 &statistics,
4999 &[true, true, true, false, false, false, true, true, true],
5001 );
5002
5003 prune_with_expr(
5005 col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
5006 &schema,
5007 &statistics,
5008 &[true, true, true, true, true, true, true, true, true],
5011 );
5012 }
5013
5014 #[test]
5015 fn prune_with_range_and_contained() {
5016 let schema = Arc::new(Schema::new(vec![
5018 Field::new("i", DataType::Int32, true),
5019 Field::new("s", DataType::Utf8, true),
5020 ]));
5021
5022 let statistics = TestStatistics::new()
5023 .with(
5024 "i",
5025 ContainerStats::new_i32(
5026 vec![
5030 Some(-5),
5031 Some(10),
5032 None,
5033 Some(-5),
5034 Some(10),
5035 None,
5036 Some(-5),
5037 Some(10),
5038 None,
5039 ], vec![
5041 Some(5),
5042 Some(20),
5043 None,
5044 Some(5),
5045 Some(20),
5046 None,
5047 Some(5),
5048 Some(20),
5049 None,
5050 ], ),
5052 )
5053 .with_contained(
5055 "s",
5056 [ScalarValue::from("foo")],
5057 [
5058 Some(true),
5060 Some(true),
5061 Some(true),
5062 Some(false),
5064 Some(false),
5065 Some(false),
5066 None,
5068 None,
5069 None,
5070 ],
5071 );
5072
5073 prune_with_expr(
5075 col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
5076 &schema,
5077 &statistics,
5078 &[true, false, true, false, false, false, true, false, true],
5083 );
5084
5085 prune_with_expr(
5087 col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
5088 &schema,
5089 &statistics,
5090 &[false, false, false, true, false, true, true, false, true],
5094 );
5095
5096 prune_with_expr(
5098 col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
5099 &schema,
5100 &statistics,
5101 &[true, true, true, true, true, true, true, true, true],
5104 );
5105 }
5106
5107 fn prune_with_expr(
5114 expr: Expr,
5115 schema: &SchemaRef,
5116 statistics: &TestStatistics,
5117 expected: &[bool],
5118 ) {
5119 println!("Pruning with expr: {expr}");
5120 let expr = logical2physical(&expr, schema);
5121 let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5122 let result = p.prune(statistics).unwrap();
5123 assert_eq!(result, expected);
5124 }
5125
5126 fn test_build_predicate_expression(
5127 expr: &Expr,
5128 schema: &Schema,
5129 required_columns: &mut RequiredColumns,
5130 ) -> Arc<dyn PhysicalExpr> {
5131 let expr = logical2physical(expr, schema);
5132 let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
5133 build_predicate_expression(
5134 &expr,
5135 &Arc::new(schema.clone()),
5136 required_columns,
5137 &unhandled_hook,
5138 )
5139 }
5140
5141 #[test]
5142 fn test_build_predicate_expression_with_false() {
5143 let expr = lit(ScalarValue::Boolean(Some(false)));
5144 let schema = Schema::empty();
5145 let res =
5146 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5147 let expected = logical2physical(&expr, &schema);
5148 assert_eq!(&res, &expected);
5149 }
5150
5151 #[test]
5152 fn test_build_predicate_expression_with_and_false() {
5153 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5154 let expr = and(
5155 col("c1").eq(lit("a")),
5156 lit(ScalarValue::Boolean(Some(false))),
5157 );
5158 let res =
5159 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5160 let expected = logical2physical(&lit(ScalarValue::Boolean(Some(false))), &schema);
5161 assert_eq!(&res, &expected);
5162 }
5163
5164 #[test]
5165 fn test_build_predicate_expression_with_or_false() {
5166 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5167 let left_expr = col("c1").eq(lit("a"));
5168 let right_expr = lit(ScalarValue::Boolean(Some(false)));
5169 let res = test_build_predicate_expression(
5170 &or(left_expr.clone(), right_expr.clone()),
5171 &schema,
5172 &mut RequiredColumns::new(),
5173 );
5174 let expected =
5175 "c1_null_count@2 != row_count@3 AND c1_min@0 <= a AND a <= c1_max@1";
5176 assert_eq!(res.to_string(), expected);
5177 }
5178}