datafusion_pruning/
pruning_predicate.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`PruningPredicate`] to apply filter [`Expr`] to prune "containers"
19//! based on statistics (e.g. Parquet Row Groups)
20//!
21//! [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
22use std::collections::HashSet;
23use std::sync::Arc;
24
25use arrow::array::AsArray;
26use arrow::{
27    array::{new_null_array, ArrayRef, BooleanArray},
28    datatypes::{DataType, Field, Schema, SchemaRef},
29    record_batch::{RecordBatch, RecordBatchOptions},
30};
31// pub use for backwards compatibility
32pub use datafusion_common::pruning::PruningStatistics;
33use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
34use datafusion_physical_plan::metrics::Count;
35use log::{debug, trace};
36
37use datafusion_common::error::Result;
38use datafusion_common::tree_node::TransformedResult;
39use datafusion_common::{
40    internal_datafusion_err, internal_err, plan_datafusion_err, plan_err,
41    tree_node::Transformed, ScalarValue,
42};
43use datafusion_common::{Column, DFSchema};
44use datafusion_expr_common::operator::Operator;
45use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee};
46use datafusion_physical_expr::{
47    expressions as phys_expr, PhysicalExprExt, PhysicalExprRef,
48};
49use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
50use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
51
52/// Used to prove that arbitrary predicates (boolean expression) can not
53/// possibly evaluate to `true` given information about a column provided by
54/// [`PruningStatistics`].
55///
56/// # Introduction
57///
58/// `PruningPredicate` analyzes filter expressions using statistics such as
59/// min/max values and null counts, attempting to prove a "container" (e.g.
60/// Parquet Row Group) can be skipped without reading the actual data,
61/// potentially leading to significant performance improvements.
62///
63/// For example, `PruningPredicate`s are used to prune Parquet Row Groups based
64/// on the min/max values found in the Parquet metadata. If the
65/// `PruningPredicate` can prove that the filter can never evaluate to `true`
66/// for any row in the Row Group, the entire Row Group is skipped during query
67/// execution.
68///
69/// The `PruningPredicate` API is general, and can be used for pruning other
70/// types of containers (e.g. files) based on statistics that may be known from
71/// external catalogs (e.g. Delta Lake) or other sources. How this works is a
72/// subtle topic.  See the Background and Implementation section for details.
73///
74/// `PruningPredicate` supports:
75///
76/// 1. Arbitrary expressions (including user defined functions)
77///
78/// 2. Vectorized evaluation (provide more than one set of statistics at a time)
79///    so it is suitable for pruning 1000s of containers.
80///
81/// 3. Any source of information that implements the [`PruningStatistics`] trait
82///    (not just Parquet metadata).
83///
84/// # Example
85///
86/// See the [`pruning.rs` example in the `datafusion-examples`] for a complete
87/// example of how to use `PruningPredicate` to prune files based on min/max
88/// values.
89///
90/// [`pruning.rs` example in the `datafusion-examples`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/pruning.rs
91///
92/// Given an expression like `x = 5` and statistics for 3 containers (Row
93/// Groups, files, etc) `A`, `B`, and `C`:
94///
95/// ```text
96///   A: {x_min = 0, x_max = 4}
97///   B: {x_min = 2, x_max = 10}
98///   C: {x_min = 5, x_max = 8}
99/// ```
100///
101/// `PruningPredicate` will conclude that the rows in container `A` can never
102/// be true (as the maximum value is only `4`), so it can be pruned:
103///
104/// ```text
105/// A: false (no rows could possibly match x = 5)
106/// B: true  (rows might match x = 5)
107/// C: true  (rows might match x = 5)
108/// ```
109///
110/// See [`PruningPredicate::try_new`] and [`PruningPredicate::prune`] for more information.
111///
112/// # Background
113///
114/// ## Boolean Tri-state logic
115///
116/// To understand the details of the rest of this documentation, it is important
117/// to understand how the tri-state boolean logic in SQL works. As this is
118/// somewhat esoteric, we review it here.
119///
120/// SQL has a notion of `NULL` that represents the value is `“unknown”` and this
121/// uncertainty propagates through expressions. SQL `NULL` behaves very
122/// differently than the `NULL` in most other languages where it is a special,
123/// sentinel value (e.g. `0` in `C/C++`). While representing uncertainty with
124/// `NULL` is powerful and elegant, SQL `NULL`s are often deeply confusing when
125/// first encountered as they behave differently than most programmers may
126/// expect.
127///
128/// In most other programming languages,
129/// * `a == NULL` evaluates to `true` if `a` also had the value `NULL`
130/// * `a == NULL` evaluates to `false` if `a` has any other value
131///
132/// However, in SQL `a = NULL` **always** evaluates to `NULL` (never `true` or
133/// `false`):
134///
135/// Expression    | Result
136/// ------------- | ---------
137/// `1 = NULL`    | `NULL`
138/// `NULL = NULL` | `NULL`
139///
140/// Also important is how `AND` and `OR` works with tri-state boolean logic as
141/// (perhaps counterintuitively) the result is **not** always NULL. While
142/// consistent with the notion of `NULL` representing “unknown”, this is again,
143/// often deeply confusing 🤯 when first encountered.
144///
145/// Expression       | Result    | Intuition
146/// ---------------  | --------- | -----------
147/// `NULL AND true`  |   `NULL`  | The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
148/// `NULL AND false` |  `false`  | If the `NULL` was either `true` or `false` the overall expression is still `false`
149/// `NULL AND NULL`  | `NULL`    |
150///
151/// Expression      | Result    | Intuition
152/// --------------- | --------- | ----------
153/// `NULL OR true`  | `true`    |  If the `NULL` was either `true` or `false` the overall expression is still `true`
154/// `NULL OR false` | `NULL`    |  The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
155/// `NULL OR NULL`  |  `NULL`   |
156///
157/// ## SQL Filter Semantics
158///
159/// The SQL `WHERE` clause has a boolean expression, often called a filter or
160/// predicate. The semantics of this predicate are that the query evaluates the
161/// predicate for each row in the input tables and:
162///
163/// * Rows that evaluate to `true` are returned in the query results
164///
165/// * Rows that evaluate to `false` are not returned (“filtered out” or “pruned” or “skipped”).
166///
167/// * Rows that evaluate to `NULL` are **NOT** returned (also “filtered out”).
168///   Note: *this treatment of `NULL` is **DIFFERENT** than how `NULL` is treated
169///   in the rewritten predicate described below.*
170///
171/// # `PruningPredicate` Implementation
172///
173/// Armed with the information in the Background section, we can now understand
174/// how the `PruningPredicate` logic works.
175///
176/// ## Interface
177///
178/// **Inputs**
179/// 1. An input schema describing what columns exist
180///
181/// 2. A predicate (expression that evaluates to a boolean)
182///
183/// 3. [`PruningStatistics`] that provides information about columns in that
184///    schema, for multiple “containers”. For each column in each container, it
185///    provides optional information on contained values, min_values, max_values,
186///    null_counts counts, and row_counts counts.
187///
188/// **Outputs**:
189/// A (non null) boolean value for each container:
190/// * `true`: There MAY be rows that match the predicate
191///
192/// * `false`: There are no rows that could possibly match the predicate (the
193///   predicate can never possibly be true). The container can be pruned (skipped)
194///   entirely.
195///
196/// While `PruningPredicate` will never return a `NULL` value, the
197/// rewritten predicate (as returned by `build_predicate_expression` and used internally
198/// by `PruningPredicate`) may evaluate to `NULL` when some of the min/max values
199/// or null / row counts are not known.
200///
201/// In order to be correct, `PruningPredicate` must return false
202/// **only** if it can determine that for all rows in the container, the
203/// predicate could never evaluate to `true` (always evaluates to either `NULL`
204/// or `false`).
205///
206/// ## Contains Analysis and Min/Max Rewrite
207///
208/// `PruningPredicate` works by first analyzing the predicate to see what
209/// [`LiteralGuarantee`] must hold for the predicate to be true.
210///
211/// Then, the `PruningPredicate` rewrites the original predicate into an
212/// expression that references the min/max values of each column in the original
213/// predicate.
214///
215/// When the min/max values are actually substituted in to this expression and
216/// evaluated, the result means
217///
218/// * `true`: there MAY be rows that pass the predicate, **KEEPS** the container
219///
220/// * `NULL`: there MAY be rows that pass the predicate, **KEEPS** the container
221///   Note that rewritten predicate can evaluate to NULL when some of
222///   the min/max values are not known. *Note that this is different than
223///   the SQL filter semantics where `NULL` means the row is filtered
224///   out.*
225///
226/// * `false`: there are no rows that could possibly match the predicate,
227///   **PRUNES** the container
228///
229/// For example, given a column `x`, the `x_min`, `x_max`, `x_null_count`, and
230/// `x_row_count` represent the minimum and maximum values, the null count of
231/// column `x`, and the row count of column `x`, provided by the `PruningStatistics`.
232/// `x_null_count` and `x_row_count` are used to handle the case where the column `x`
233/// is known to be all `NULL`s. Note this is different from knowing nothing about
234/// the column `x`, which confusingly is encoded by returning `NULL` for the min/max
235/// values from [`PruningStatistics::max_values`] and [`PruningStatistics::min_values`].
236///
237/// Here are some examples of the rewritten predicates:
238///
239/// Original Predicate | Rewritten Predicate
240/// ------------------ | --------------------
241/// `x = 5` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)`
242/// `x < 5` | `x_null_count != x_row_count THEN false (x_max < 5)`
243/// `x = 5 AND y = 10` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max) AND y_null_count != y_row_count (y_min <= 10 AND 10 <= y_max)`
244/// `x IS NULL`  | `x_null_count > 0`
245/// `x IS NOT NULL`  | `x_null_count != row_count`
246/// `CAST(x as int) = 5` | `x_null_count != x_row_count (CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int))`
247///
248/// ## Predicate Evaluation
249/// The PruningPredicate works in two passes
250///
251/// **First pass**:  For each `LiteralGuarantee` calls
252/// [`PruningStatistics::contained`] and rules out containers where the
253/// LiteralGuarantees are not satisfied
254///
255/// **Second Pass**: Evaluates the rewritten expression using the
256/// min/max/null_counts/row_counts values for each column for each container. For any
257/// container that this expression evaluates to `false`, it rules out those
258/// containers.
259///
260///
261/// ### Example 1
262///
263/// Given the predicate, `x = 5 AND y = 10`, the rewritten predicate would look like:
264///
265/// ```sql
266/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
267/// AND
268/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
269/// ```
270///
271/// If we know that for a given container, `x` is between `1 and 100` and we know that
272/// `y` is between `4` and `7`, we know nothing about the null count and row count of
273/// `x` and `y`, the input statistics might look like:
274///
275/// Column   | Value
276/// -------- | -----
277/// `x_min`  | `1`
278/// `x_max`  | `100`
279/// `x_null_count` | `null`
280/// `x_row_count`  | `null`
281/// `y_min`  | `4`
282/// `y_max`  | `7`
283/// `y_null_count` | `null`
284/// `y_row_count`  | `null`
285///
286/// When these statistics values are substituted in to the rewritten predicate and
287/// simplified, the result is `false`:
288///
289/// * `null != null AND (1 <= 5 AND 5 <= 100) AND null != null AND (4 <= 10 AND 10 <= 7)`
290/// * `null = null` is `null` which is not true, so the AND moves on to the next clause
291/// * `null and (1 <= 5 AND 5 <= 100) AND null AND (4 <= 10 AND 10 <= 7)`
292/// * evaluating the clauses further we get:
293/// * `null and true and null and false`
294/// * `null and false`
295/// * `false`
296///
297/// Returning `false` means the container can be pruned, which matches the
298/// intuition that  `x = 5 AND y = 10` can’t be true for any row if all values of `y`
299/// are `7` or less.
300///
301/// Note that if we had ended up with `null AND true AND null AND true` the result
302/// would have been `null`.
303/// `null` is treated the same as`true`, because we can't prove that the predicate is `false.`
304///
305/// If, for some other container, we knew `y` was between the values `4` and
306/// `15`, then the rewritten predicate evaluates to `true` (verifying this is
307/// left as an exercise to the reader -- are you still here?), and the container
308/// **could not** be pruned. The intuition is that there may be rows where the
309/// predicate *might* evaluate to `true`, and the only way to find out is to do
310/// more analysis, for example by actually reading the data and evaluating the
311/// predicate row by row.
312///
313/// ### Example 2
314///
315/// Given the same predicate, `x = 5 AND y = 10`, the rewritten predicate would
316/// look like the same as example 1:
317///
318/// ```sql
319/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
320/// AND
321/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
322/// ```
323///
324/// If we know that for another given container, `x_min` is NULL and `x_max` is
325/// NULL (the min/max values are unknown), `x_null_count` is `100` and `x_row_count`
326///  is `100`; we know that `y` is between `4` and `7`, but we know nothing about
327/// the null count and row count of `y`. The input statistics might look like:
328///
329/// Column   | Value
330/// -------- | -----
331/// `x_min`  | `null`
332/// `x_max`  | `null`
333/// `x_null_count` | `100`
334/// `x_row_count`  | `100`
335/// `y_min`  | `4`
336/// `y_max`  | `7`
337/// `y_null_count` | `null`
338/// `y_row_count`  | `null`
339///
340/// When these statistics values are substituted in to the rewritten predicate and
341/// simplified, the result is `false`:
342///
343/// * `100 != 100 AND (null <= 5 AND 5 <= null) AND null = null AND (4 <= 10 AND 10 <= 7)`
344/// * `false AND null AND null AND false`
345/// * `false AND false`
346/// * `false`
347///
348/// Returning `false` means the container can be pruned, which matches the
349/// intuition that  `x = 5 AND y = 10` can’t be true because all values in `x`
350/// are known to be NULL.
351///
352/// # Related Work
353///
354/// [`PruningPredicate`] implements the type of min/max pruning described in
355/// Section `3.3.3` of the [`Snowflake SIGMOD Paper`]. The technique is
356/// described by various research such as [small materialized aggregates], [zone
357/// maps], and [data skipping].
358///
359/// [`Snowflake SIGMOD Paper`]: https://dl.acm.org/doi/10.1145/2882903.2903741
360/// [small materialized aggregates]: https://www.vldb.org/conf/1998/p476.pdf
361/// [zone maps]: https://dl.acm.org/doi/10.1007/978-3-642-03730-6_10
362/// [data skipping]: https://dl.acm.org/doi/10.1145/2588555.2610515
363#[derive(Debug, Clone)]
364pub struct PruningPredicate {
365    /// The input schema against which the predicate will be evaluated
366    schema: SchemaRef,
367    /// A min/max pruning predicate (rewritten in terms of column min/max
368    /// values, which are supplied by statistics)
369    predicate_expr: Arc<dyn PhysicalExpr>,
370    /// Description of which statistics are required to evaluate `predicate_expr`
371    required_columns: RequiredColumns,
372    /// Original physical predicate from which this predicate expr is derived
373    /// (required for serialization)
374    orig_expr: Arc<dyn PhysicalExpr>,
375    /// [`LiteralGuarantee`]s used to try and prove a predicate can not possibly
376    /// evaluate to `true`.
377    ///
378    /// See [`PruningPredicate::literal_guarantees`] for more details.
379    literal_guarantees: Vec<LiteralGuarantee>,
380}
381
382/// Build a pruning predicate from an optional predicate expression.
383/// If the predicate is None or the predicate cannot be converted to a pruning
384/// predicate, return None.
385/// If there is an error creating the pruning predicate it is recorded by incrementing
386/// the `predicate_creation_errors` counter.
387pub fn build_pruning_predicate(
388    predicate: Arc<dyn PhysicalExpr>,
389    file_schema: &SchemaRef,
390    predicate_creation_errors: &Count,
391) -> Option<Arc<PruningPredicate>> {
392    match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
393        Ok(pruning_predicate) => {
394            if !pruning_predicate.always_true() {
395                return Some(Arc::new(pruning_predicate));
396            }
397        }
398        Err(e) => {
399            debug!("Could not create pruning predicate for: {e}");
400            predicate_creation_errors.add(1);
401        }
402    }
403    None
404}
405
406/// Rewrites predicates that [`PredicateRewriter`] can not handle, e.g. certain
407/// complex expressions or predicates that reference columns that are not in the
408/// schema.
409pub trait UnhandledPredicateHook {
410    /// Called when a predicate can not be rewritten in terms of statistics or
411    /// references a column that is not in the schema.
412    fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
413}
414
415/// The default handling for unhandled predicates is to return a constant `true`
416/// (meaning don't prune the container)
417#[derive(Debug, Clone)]
418struct ConstantUnhandledPredicateHook {
419    default: Arc<dyn PhysicalExpr>,
420}
421
422impl Default for ConstantUnhandledPredicateHook {
423    fn default() -> Self {
424        Self {
425            default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
426        }
427    }
428}
429
430impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
431    fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
432        Arc::clone(&self.default)
433    }
434}
435
436impl PruningPredicate {
437    /// Try to create a new instance of [`PruningPredicate`]
438    ///
439    /// This will translate the provided `expr` filter expression into
440    /// a *pruning predicate*.
441    ///
442    /// A pruning predicate is one that has been rewritten in terms of
443    /// the min and max values of column references and that evaluates
444    /// to FALSE if the filter predicate would evaluate FALSE *for
445    /// every row* whose values fell within the min / max ranges (aka
446    /// could be pruned).
447    ///
448    /// The pruning predicate evaluates to TRUE or NULL
449    /// if the filter predicate *might* evaluate to TRUE for at least
450    /// one row whose values fell within the min/max ranges (in other
451    /// words they might pass the predicate)
452    ///
453    /// For example, the filter expression `(column / 2) = 4` becomes
454    /// the pruning predicate
455    /// `(column_min / 2) <= 4 && 4 <= (column_max / 2))`
456    ///
457    /// See the struct level documentation on [`PruningPredicate`] for more
458    /// details.
459    pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
460        // Get a (simpler) snapshot of the physical expr here to use with `PruningPredicate`
461        // which does not handle dynamic exprs in general
462        let expr = snapshot_physical_expr(expr)?;
463        let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
464
465        // build predicate expression once
466        let mut required_columns = RequiredColumns::new();
467        let predicate_expr = build_predicate_expression(
468            &expr,
469            &schema,
470            &mut required_columns,
471            &unhandled_hook,
472        );
473        let predicate_schema = required_columns.schema();
474        // Simplify the newly created predicate to get rid of redundant casts, comparisons, etc.
475        let predicate_expr =
476            PhysicalExprSimplifier::new(&predicate_schema).simplify(predicate_expr)?;
477
478        let literal_guarantees = LiteralGuarantee::analyze(&expr);
479
480        Ok(Self {
481            schema,
482            predicate_expr,
483            required_columns,
484            orig_expr: expr,
485            literal_guarantees,
486        })
487    }
488
489    /// For each set of statistics, evaluates the pruning predicate
490    /// and returns a `bool` with the following meaning for a
491    /// all rows whose values match the statistics:
492    ///
493    /// `true`: There MAY be rows that match the predicate
494    ///
495    /// `false`: There are no rows that could possibly match the predicate
496    ///
497    /// Note: the predicate passed to `prune` should already be simplified as
498    /// much as possible (e.g. this pass doesn't handle some
499    /// expressions like `b = false`, but it does handle the
500    /// simplified version `b`. See [`ExprSimplifier`] to simplify expressions.
501    ///
502    /// [`ExprSimplifier`]: https://docs.rs/datafusion/latest/datafusion/optimizer/simplify_expressions/struct.ExprSimplifier.html
503    pub fn prune<S: PruningStatistics + ?Sized>(
504        &self,
505        statistics: &S,
506    ) -> Result<Vec<bool>> {
507        let mut builder = BoolVecBuilder::new(statistics.num_containers());
508
509        // Try to prove the predicate can't be true for the containers based on
510        // literal guarantees
511        for literal_guarantee in &self.literal_guarantees {
512            let LiteralGuarantee {
513                column,
514                guarantee,
515                literals,
516            } = literal_guarantee;
517            if let Some(results) = statistics.contained(column, literals) {
518                match guarantee {
519                    // `In` means the values in the column must be one of the
520                    // values in the set for the predicate to evaluate to true.
521                    // If `contained` returns false, that means the column is
522                    // not any of the values so we can prune the container
523                    Guarantee::In => builder.combine_array(&results),
524                    // `NotIn` means the values in the column must not be
525                    // any of the values in the set for the predicate to
526                    // evaluate to true. If `contained` returns true, it means the
527                    // column is only in the set of values so we can prune the
528                    // container
529                    Guarantee::NotIn => {
530                        builder.combine_array(&arrow::compute::not(&results)?)
531                    }
532                }
533                // if all containers are pruned (has rows that DEFINITELY DO NOT pass the predicate)
534                // can return early without evaluating the rest of predicates.
535                if builder.check_all_pruned() {
536                    return Ok(builder.build());
537                }
538            }
539        }
540
541        // Next, try to prove the predicate can't be true for the containers based
542        // on min/max values
543
544        // build a RecordBatch that contains the min/max values in the
545        // appropriate statistics columns for the min/max predicate
546        let statistics_batch =
547            build_statistics_record_batch(statistics, &self.required_columns)?;
548
549        // Evaluate the pruning predicate on that record batch and append any results to the builder
550        builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
551
552        Ok(builder.build())
553    }
554
555    /// Return a reference to the input schema
556    pub fn schema(&self) -> &SchemaRef {
557        &self.schema
558    }
559
560    /// Returns a reference to the physical expr used to construct this pruning predicate
561    pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
562        &self.orig_expr
563    }
564
565    /// Returns a reference to the predicate expr
566    pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
567        &self.predicate_expr
568    }
569
570    /// Returns a reference to the literal guarantees
571    ///
572    /// Note that **All** `LiteralGuarantee`s must be satisfied for the
573    /// expression to possibly be `true`. If any is not satisfied, the
574    /// expression is guaranteed to be `null` or `false`.
575    pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
576        &self.literal_guarantees
577    }
578
579    /// Returns true if this pruning predicate can not prune anything.
580    ///
581    /// This happens if the predicate is a literal `true`  and
582    /// literal_guarantees is empty.
583    ///
584    /// This can happen when a predicate is simplified to a constant `true`
585    pub fn always_true(&self) -> bool {
586        is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
587    }
588
589    // this is only used by `parquet` feature right now
590    #[allow(dead_code)]
591    pub fn required_columns(&self) -> &RequiredColumns {
592        &self.required_columns
593    }
594
595    /// Names of the columns that are known to be / not be in a set
596    /// of literals (constants). These are the columns the that may be passed to
597    /// [`PruningStatistics::contained`] during pruning.
598    ///
599    /// This is useful to avoid fetching statistics for columns that will not be
600    /// used in the predicate. For example, it can be used to avoid reading
601    /// unneeded bloom filters (a non trivial operation).
602    pub fn literal_columns(&self) -> Vec<String> {
603        let mut seen = HashSet::new();
604        self.literal_guarantees
605            .iter()
606            .map(|e| &e.column.name)
607            // avoid duplicates
608            .filter(|name| seen.insert(*name))
609            .map(|s| s.to_string())
610            .collect()
611    }
612}
613
614/// Builds the return `Vec` for [`PruningPredicate::prune`].
615#[derive(Debug)]
616struct BoolVecBuilder {
617    /// One element per container. Each element is
618    /// * `true`: if the container has row that may pass the predicate
619    /// * `false`: if the container has rows that DEFINITELY DO NOT pass the predicate
620    inner: Vec<bool>,
621}
622
623impl BoolVecBuilder {
624    /// Create a new `BoolVecBuilder` with `num_containers` elements
625    fn new(num_containers: usize) -> Self {
626        Self {
627            // assume by default all containers may pass the predicate
628            inner: vec![true; num_containers],
629        }
630    }
631
632    /// Combines result `array` for a conjunct (e.g. `AND` clause) of a
633    /// predicate into the currently in progress array.
634    ///
635    /// Each `array` element is:
636    /// * `true`: container has row that may pass the predicate
637    /// * `false`: all container rows DEFINITELY DO NOT pass the predicate
638    /// * `null`: container may or may not have rows that pass the predicate
639    fn combine_array(&mut self, array: &BooleanArray) {
640        assert_eq!(array.len(), self.inner.len());
641        for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
642            // `false` for this conjunct means we know for sure no rows could
643            // pass the predicate and thus we set the corresponding container
644            // location to false.
645            if let Some(false) = new {
646                *cur = false;
647            }
648        }
649    }
650
651    /// Combines the results in the [`ColumnarValue`] to the currently in
652    /// progress array, following the same rules as [`Self::combine_array`].
653    ///
654    /// # Panics
655    /// If `value` is not boolean
656    fn combine_value(&mut self, value: ColumnarValue) {
657        match value {
658            ColumnarValue::Array(array) => {
659                self.combine_array(array.as_boolean());
660            }
661            ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
662                // False means all containers can not pass the predicate
663                self.inner = vec![false; self.inner.len()];
664            }
665            _ => {
666                // Null or true means the rows in container may pass this
667                // conjunct so we can't prune any containers based on that
668            }
669        }
670    }
671
672    /// Convert this builder into a Vec of bools
673    fn build(self) -> Vec<bool> {
674        self.inner
675    }
676
677    /// Check all containers has rows that DEFINITELY DO NOT pass the predicate
678    fn check_all_pruned(&self) -> bool {
679        self.inner.iter().all(|&x| !x)
680    }
681}
682
683fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
684    expr.as_any()
685        .downcast_ref::<phys_expr::Literal>()
686        .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
687        .unwrap_or_default()
688}
689
690fn is_always_false(expr: &Arc<dyn PhysicalExpr>) -> bool {
691    expr.as_any()
692        .downcast_ref::<phys_expr::Literal>()
693        .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(false))))
694        .unwrap_or_default()
695}
696
697/// Describes which columns statistics are necessary to evaluate a
698/// [`PruningPredicate`].
699///
700/// This structure permits reading and creating the minimum number statistics,
701/// which is important since statistics may be non trivial to read (e.g. large
702/// strings or when there are 1000s of columns).
703///
704/// Handles creating references to the min/max statistics
705/// for columns as well as recording which statistics are needed
706#[derive(Debug, Default, Clone)]
707pub struct RequiredColumns {
708    /// The statistics required to evaluate this predicate:
709    /// * The unqualified column in the input schema
710    /// * Statistics type (e.g. Min or Max or Null_Count)
711    /// * The field the statistics value should be placed in for
712    ///   pruning predicate evaluation (e.g. `min_value` or `max_value`)
713    columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
714}
715
716impl RequiredColumns {
717    fn new() -> Self {
718        Self::default()
719    }
720
721    /// Returns Some(column) if this is a single column predicate.
722    ///
723    /// Returns None if this is a multi-column predicate.
724    ///
725    /// Examples:
726    /// * `a > 5 OR a < 10` returns `Some(a)`
727    /// * `a > 5 OR b < 10` returns `None`
728    /// * `true` returns None
729    #[allow(dead_code)]
730    // this fn is only used by `parquet` feature right now, thus the `allow(dead_code)`
731    pub fn single_column(&self) -> Option<&phys_expr::Column> {
732        if self.columns.windows(2).all(|w| {
733            // check if all columns are the same (ignoring statistics and field)
734            let c1 = &w[0].0;
735            let c2 = &w[1].0;
736            c1 == c2
737        }) {
738            self.columns.first().map(|r| &r.0)
739        } else {
740            None
741        }
742    }
743
744    /// Returns a schema that describes the columns required to evaluate this
745    /// pruning predicate.
746    /// The schema contains the fields for each column in `self.columns` with
747    /// the appropriate data type for the statistics.
748    /// Order matters, this same order is used to evaluate the
749    /// pruning predicate.
750    fn schema(&self) -> Schema {
751        let fields = self
752            .columns
753            .iter()
754            .map(|(_c, _t, f)| f.clone())
755            .collect::<Vec<_>>();
756        Schema::new(fields)
757    }
758
759    /// Returns an iterator over items in columns (see doc on
760    /// `self.columns` for details)
761    pub(crate) fn iter(
762        &self,
763    ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
764        self.columns.iter()
765    }
766
767    fn find_stat_column(
768        &self,
769        column: &phys_expr::Column,
770        statistics_type: StatisticsType,
771    ) -> Option<usize> {
772        match statistics_type {
773            StatisticsType::RowCount => {
774                // Use the first row count we find, if any
775                self.columns
776                    .iter()
777                    .enumerate()
778                    .find(|(_i, (_c, t, _f))| t == &statistics_type)
779                    .map(|(i, (_c, _t, _f))| i)
780            }
781            _ => self
782                .columns
783                .iter()
784                .enumerate()
785                .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
786                .map(|(i, (_c, _t, _f))| i),
787        }
788    }
789
790    /// Rewrites column_expr so that all appearances of column
791    /// are replaced with a reference to either the min or max
792    /// statistics column, while keeping track that a reference to the statistics
793    /// column is required
794    ///
795    /// for example, an expression like `col("foo") > 5`, when called
796    /// with Max would result in an expression like `col("foo_max") >
797    /// 5` with the appropriate entry noted in self.columns
798    fn stat_column_expr(
799        &mut self,
800        column: &phys_expr::Column,
801        column_expr: &Arc<dyn PhysicalExpr>,
802        field: &Field,
803        stat_type: StatisticsType,
804    ) -> Result<Arc<dyn PhysicalExpr>> {
805        let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
806            Some(idx) => (idx, false),
807            None => (self.columns.len(), true),
808        };
809
810        let column_name = column.name();
811        let stat_column_name = match stat_type {
812            StatisticsType::Min => format!("{column_name}_min"),
813            StatisticsType::Max => format!("{column_name}_max"),
814            StatisticsType::NullCount => format!("{column_name}_null_count"),
815            StatisticsType::RowCount => "row_count".to_string(),
816        };
817
818        let stat_column = phys_expr::Column::new(&stat_column_name, idx);
819
820        // only add statistics column if not previously added
821        if need_to_insert {
822            // may be null if statistics are not present
823            let nullable = true;
824            let stat_field =
825                Field::new(stat_column.name(), field.data_type().clone(), nullable);
826            self.columns.push((column.clone(), stat_type, stat_field));
827        }
828        rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
829    }
830
831    /// rewrite col --> col_min
832    fn min_column_expr(
833        &mut self,
834        column: &phys_expr::Column,
835        column_expr: &Arc<dyn PhysicalExpr>,
836        field: &Field,
837    ) -> Result<Arc<dyn PhysicalExpr>> {
838        self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
839    }
840
841    /// rewrite col --> col_max
842    fn max_column_expr(
843        &mut self,
844        column: &phys_expr::Column,
845        column_expr: &Arc<dyn PhysicalExpr>,
846        field: &Field,
847    ) -> Result<Arc<dyn PhysicalExpr>> {
848        self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
849    }
850
851    /// rewrite col --> col_null_count
852    fn null_count_column_expr(
853        &mut self,
854        column: &phys_expr::Column,
855        column_expr: &Arc<dyn PhysicalExpr>,
856        field: &Field,
857    ) -> Result<Arc<dyn PhysicalExpr>> {
858        self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
859    }
860
861    /// rewrite col --> col_row_count
862    fn row_count_column_expr(
863        &mut self,
864        column: &phys_expr::Column,
865        column_expr: &Arc<dyn PhysicalExpr>,
866        field: &Field,
867    ) -> Result<Arc<dyn PhysicalExpr>> {
868        self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
869    }
870}
871
872impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
873    fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
874        Self { columns }
875    }
876}
877
878/// Build a RecordBatch from a list of statistics, creating arrays,
879/// with one row for each PruningStatistics and columns specified in
880/// the required_columns parameter.
881///
882/// For example, if the requested columns are
883/// ```text
884/// ("s1", Min, Field:s1_min)
885/// ("s2", Max, field:s2_max)
886/// ```
887///
888/// And the input statistics had
889/// ```text
890/// S1(Min: 5, Max: 10)
891/// S2(Min: 99, Max: 1000)
892/// S3(Min: 1, Max: 2)
893/// ```
894///
895/// Then this function would build a record batch with 2 columns and
896/// one row s1_min and s2_max as follows (s3 is not requested):
897///
898/// ```text
899/// s1_min | s2_max
900/// -------+--------
901///   5    | 1000
902/// ```
903fn build_statistics_record_batch<S: PruningStatistics + ?Sized>(
904    statistics: &S,
905    required_columns: &RequiredColumns,
906) -> Result<RecordBatch> {
907    let mut arrays = Vec::<ArrayRef>::new();
908    // For each needed statistics column:
909    for (column, statistics_type, stat_field) in required_columns.iter() {
910        let column = Column::from_name(column.name());
911        let data_type = stat_field.data_type();
912
913        let num_containers = statistics.num_containers();
914
915        let array = match statistics_type {
916            StatisticsType::Min => statistics.min_values(&column),
917            StatisticsType::Max => statistics.max_values(&column),
918            StatisticsType::NullCount => statistics.null_counts(&column),
919            StatisticsType::RowCount => statistics.row_counts(&column),
920        };
921        let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
922
923        if num_containers != array.len() {
924            return internal_err!(
925                "mismatched statistics length. Expected {}, got {}",
926                num_containers,
927                array.len()
928            );
929        }
930
931        // cast statistics array to required data type (e.g. parquet
932        // provides timestamp statistics as "Int64")
933        let array = arrow::compute::cast(&array, data_type)?;
934
935        arrays.push(array);
936    }
937
938    let schema = Arc::new(required_columns.schema());
939    // provide the count in case there were no needed statistics
940    let mut options = RecordBatchOptions::default();
941    options.row_count = Some(statistics.num_containers());
942
943    trace!("Creating statistics batch for {required_columns:#?} with {arrays:#?}");
944
945    RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
946        plan_datafusion_err!("Can not create statistics record batch: {err}")
947    })
948}
949
950struct PruningExpressionBuilder<'a> {
951    column: phys_expr::Column,
952    column_expr: Arc<dyn PhysicalExpr>,
953    op: Operator,
954    scalar_expr: Arc<dyn PhysicalExpr>,
955    field: &'a Field,
956    required_columns: &'a mut RequiredColumns,
957}
958
959impl<'a> PruningExpressionBuilder<'a> {
960    fn try_new(
961        left: &'a Arc<dyn PhysicalExpr>,
962        right: &'a Arc<dyn PhysicalExpr>,
963        op: Operator,
964        schema: &'a SchemaRef,
965        required_columns: &'a mut RequiredColumns,
966    ) -> Result<Self> {
967        // find column name; input could be a more complicated expression
968        let left_columns = collect_columns(left);
969        let right_columns = collect_columns(right);
970        let (column_expr, scalar_expr, columns, correct_operator) =
971            match (left_columns.len(), right_columns.len()) {
972                (1, 0) => (left, right, left_columns, op),
973                (0, 1) => (right, left, right_columns, reverse_operator(op)?),
974                _ => {
975                    // if more than one column used in expression - not supported
976                    return plan_err!(
977                        "Multi-column expressions are not currently supported"
978                    );
979                }
980            };
981
982        let df_schema = DFSchema::try_from(Arc::clone(schema))?;
983        let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
984            column_expr,
985            correct_operator,
986            scalar_expr,
987            df_schema,
988        )?;
989        let column = columns.iter().next().unwrap().clone();
990        let field = match schema.column_with_name(column.name()) {
991            Some((_, f)) => f,
992            _ => {
993                return plan_err!("Field not found in schema");
994            }
995        };
996
997        Ok(Self {
998            column,
999            column_expr,
1000            op: correct_operator,
1001            scalar_expr,
1002            field,
1003            required_columns,
1004        })
1005    }
1006
1007    fn op(&self) -> Operator {
1008        self.op
1009    }
1010
1011    fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
1012        &self.scalar_expr
1013    }
1014
1015    fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1016        self.required_columns
1017            .min_column_expr(&self.column, &self.column_expr, self.field)
1018    }
1019
1020    fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1021        self.required_columns
1022            .max_column_expr(&self.column, &self.column_expr, self.field)
1023    }
1024
1025    /// This function is to simply retune the `null_count` physical expression no matter what the
1026    /// predicate expression is
1027    ///
1028    /// i.e., x > 5 => x_null_count,
1029    ///       cast(x as int) < 10 => x_null_count,
1030    ///       try_cast(x as float) < 10.0 => x_null_count
1031    fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1032        // Retune to [`phys_expr::Column`]
1033        let column_expr = Arc::new(self.column.clone()) as _;
1034
1035        // null_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1036        let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1037
1038        self.required_columns.null_count_column_expr(
1039            &self.column,
1040            &column_expr,
1041            null_count_field,
1042        )
1043    }
1044
1045    /// This function is to simply retune the `row_count` physical expression no matter what the
1046    /// predicate expression is
1047    ///
1048    /// i.e., x > 5 => x_row_count,
1049    ///       cast(x as int) < 10 => x_row_count,
1050    ///       try_cast(x as float) < 10.0 => x_row_count
1051    fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1052        // Retune to [`phys_expr::Column`]
1053        let column_expr = Arc::new(self.column.clone()) as _;
1054
1055        // row_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1056        let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1057
1058        self.required_columns.row_count_column_expr(
1059            &self.column,
1060            &column_expr,
1061            row_count_field,
1062        )
1063    }
1064}
1065
1066/// This function is designed to rewrite the column_expr to
1067/// ensure the column_expr is monotonically increasing.
1068///
1069/// For example,
1070/// 1. `col > 10`
1071/// 2. `-col > 10` should be rewritten to `col < -10`
1072/// 3. `!col = true` would be rewritten to `col = !true`
1073/// 4. `abs(a - 10) > 0` not supported
1074/// 5. `cast(can_prunable_expr) > 10`
1075/// 6. `try_cast(can_prunable_expr) > 10`
1076///
1077/// More rewrite rules are still in progress.
1078fn rewrite_expr_to_prunable(
1079    column_expr: &PhysicalExprRef,
1080    op: Operator,
1081    scalar_expr: &PhysicalExprRef,
1082    schema: DFSchema,
1083) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1084    if !is_compare_op(op) {
1085        return plan_err!("rewrite_expr_to_prunable only support compare expression");
1086    }
1087
1088    let column_expr_any = column_expr.as_any();
1089
1090    if column_expr_any
1091        .downcast_ref::<phys_expr::Column>()
1092        .is_some()
1093    {
1094        // `col op lit()`
1095        Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
1096    } else if let Some(cast) = column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
1097        // `cast(col) op lit()`
1098        let arrow_schema = schema.as_arrow();
1099        let from_type = cast.expr().data_type(arrow_schema)?;
1100        verify_support_type_for_prune(&from_type, cast.cast_type())?;
1101        let (left, op, right) =
1102            rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
1103        let left = Arc::new(phys_expr::CastExpr::new(
1104            left,
1105            cast.cast_type().clone(),
1106            None,
1107        ));
1108        Ok((left, op, right))
1109    } else if let Some(try_cast) =
1110        column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
1111    {
1112        // `try_cast(col) op lit()`
1113        let arrow_schema = schema.as_arrow();
1114        let from_type = try_cast.expr().data_type(arrow_schema)?;
1115        verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
1116        let (left, op, right) =
1117            rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?;
1118        let left = Arc::new(phys_expr::TryCastExpr::new(
1119            left,
1120            try_cast.cast_type().clone(),
1121        ));
1122        Ok((left, op, right))
1123    } else if let Some(neg) = column_expr_any.downcast_ref::<phys_expr::NegativeExpr>() {
1124        // `-col > lit()`  --> `col < -lit()`
1125        let (left, op, right) =
1126            rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
1127        let right = Arc::new(phys_expr::NegativeExpr::new(right));
1128        Ok((left, reverse_operator(op)?, right))
1129    } else if let Some(not) = column_expr_any.downcast_ref::<phys_expr::NotExpr>() {
1130        // `!col = true` --> `col = !true`
1131        if op != Operator::Eq && op != Operator::NotEq {
1132            return plan_err!("Not with operator other than Eq / NotEq is not supported");
1133        }
1134        if not
1135            .arg()
1136            .as_any()
1137            .downcast_ref::<phys_expr::Column>()
1138            .is_some()
1139        {
1140            let left = Arc::clone(not.arg());
1141            let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
1142            Ok((left, reverse_operator(op)?, right))
1143        } else {
1144            plan_err!("Not with complex expression {column_expr:?} is not supported")
1145        }
1146    } else {
1147        plan_err!("column expression {column_expr:?} is not supported")
1148    }
1149}
1150
1151fn is_compare_op(op: Operator) -> bool {
1152    matches!(
1153        op,
1154        Operator::Eq
1155            | Operator::NotEq
1156            | Operator::Lt
1157            | Operator::LtEq
1158            | Operator::Gt
1159            | Operator::GtEq
1160            | Operator::LikeMatch
1161            | Operator::NotLikeMatch
1162    )
1163}
1164
1165fn is_string_type(data_type: &DataType) -> bool {
1166    matches!(
1167        data_type,
1168        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1169    )
1170}
1171
1172// The pruning logic is based on the comparing the min/max bounds.
1173// Must make sure the two type has order.
1174// For example, casts from string to numbers is not correct.
1175// Because the "13" is less than "3" with UTF8 comparison order.
1176fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
1177    // Dictionary casts are always supported as long as the value types are supported
1178    let from_type = match from_type {
1179        DataType::Dictionary(_, t) => {
1180            return verify_support_type_for_prune(t.as_ref(), to_type)
1181        }
1182        _ => from_type,
1183    };
1184    let to_type = match to_type {
1185        DataType::Dictionary(_, t) => {
1186            return verify_support_type_for_prune(from_type, t.as_ref())
1187        }
1188        _ => to_type,
1189    };
1190    // If both types are strings or both are not strings (number, timestamp, etc)
1191    // then we can compare them.
1192    // PruningPredicate does not support casting of strings to numbers and such.
1193    if is_string_type(from_type) == is_string_type(to_type) {
1194        Ok(())
1195    } else {
1196        plan_err!(
1197            "Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
1198        )
1199    }
1200}
1201
1202/// replaces a column with an old name with a new name in an expression
1203fn rewrite_column_expr(
1204    e: Arc<dyn PhysicalExpr>,
1205    column_old: &phys_expr::Column,
1206    column_new: &phys_expr::Column,
1207) -> Result<Arc<dyn PhysicalExpr>> {
1208    e.transform_with_lambdas_params(|expr, lambdas_params| {
1209        if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1210            if !lambdas_params.contains(column.name()) && column == column_old {
1211                return Ok(Transformed::yes(Arc::new(column_new.clone())));
1212            }
1213        }
1214
1215        Ok(Transformed::no(expr))
1216    })
1217    .data()
1218}
1219
1220fn reverse_operator(op: Operator) -> Result<Operator> {
1221    op.swap().ok_or_else(|| {
1222        internal_datafusion_err!(
1223            "Could not reverse operator {op} while building pruning predicate"
1224        )
1225    })
1226}
1227
1228/// Given a column reference to `column`, returns a pruning
1229/// expression in terms of the min and max that will evaluate to true
1230/// if the column may contain values, and false if definitely does not
1231/// contain values
1232fn build_single_column_expr(
1233    column: &phys_expr::Column,
1234    schema: &Schema,
1235    required_columns: &mut RequiredColumns,
1236    is_not: bool, // if true, treat as !col
1237) -> Option<Arc<dyn PhysicalExpr>> {
1238    let field = schema.field_with_name(column.name()).ok()?;
1239
1240    if matches!(field.data_type(), &DataType::Boolean) {
1241        let col_ref = Arc::new(column.clone()) as _;
1242
1243        let min = required_columns
1244            .min_column_expr(column, &col_ref, field)
1245            .ok()?;
1246        let max = required_columns
1247            .max_column_expr(column, &col_ref, field)
1248            .ok()?;
1249
1250        // remember -- we want an expression that is:
1251        // TRUE: if there may be rows that match
1252        // FALSE: if there are no rows that match
1253        if is_not {
1254            // The only way we know a column couldn't match is if both the min and max are true
1255            // !(min && max)
1256            Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
1257                phys_expr::BinaryExpr::new(min, Operator::And, max),
1258            ))))
1259        } else {
1260            // the only way we know a column couldn't match is if both the min and max are false
1261            // !(!min && !max) --> min || max
1262            Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
1263        }
1264    } else {
1265        None
1266    }
1267}
1268
1269/// Given an expression reference to `expr`, if `expr` is a column expression,
1270/// returns a pruning expression in terms of IsNull that will evaluate to true
1271/// if the column may contain null, and false if definitely does not
1272/// contain null.
1273/// If `with_not` is true, build a pruning expression for `col IS NOT NULL`: `col_count != col_null_count`
1274/// The pruning expression evaluates to true ONLY if the column definitely CONTAINS
1275/// at least one NULL value.  In this case we can know that `IS NOT NULL` can not be true and
1276/// thus can prune the row group / value
1277fn build_is_null_column_expr(
1278    expr: &Arc<dyn PhysicalExpr>,
1279    schema: &Schema,
1280    required_columns: &mut RequiredColumns,
1281    with_not: bool,
1282) -> Option<Arc<dyn PhysicalExpr>> {
1283    if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1284        let field = schema.field_with_name(col.name()).ok()?;
1285
1286        let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1287        if with_not {
1288            if let Ok(row_count_expr) =
1289                required_columns.row_count_column_expr(col, expr, null_count_field)
1290            {
1291                required_columns
1292                    .null_count_column_expr(col, expr, null_count_field)
1293                    .map(|null_count_column_expr| {
1294                        // IsNotNull(column) => null_count != row_count
1295                        Arc::new(phys_expr::BinaryExpr::new(
1296                            null_count_column_expr,
1297                            Operator::NotEq,
1298                            row_count_expr,
1299                        )) as _
1300                    })
1301                    .ok()
1302            } else {
1303                None
1304            }
1305        } else {
1306            required_columns
1307                .null_count_column_expr(col, expr, null_count_field)
1308                .map(|null_count_column_expr| {
1309                    // IsNull(column) => null_count > 0
1310                    Arc::new(phys_expr::BinaryExpr::new(
1311                        null_count_column_expr,
1312                        Operator::Gt,
1313                        Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1314                    )) as _
1315                })
1316                .ok()
1317        }
1318    } else {
1319        None
1320    }
1321}
1322
1323/// The maximum number of entries in an `InList` that might be rewritten into
1324/// an OR chain
1325const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
1326
1327/// Rewrite a predicate expression in terms of statistics (min/max/null_counts)
1328/// for use as a [`PruningPredicate`].
1329pub struct PredicateRewriter {
1330    unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1331}
1332
1333impl Default for PredicateRewriter {
1334    fn default() -> Self {
1335        Self {
1336            unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
1337        }
1338    }
1339}
1340
1341impl PredicateRewriter {
1342    /// Create a new `PredicateRewriter`
1343    pub fn new() -> Self {
1344        Self::default()
1345    }
1346
1347    /// Set the unhandled hook to be used when a predicate can not be rewritten
1348    pub fn with_unhandled_hook(
1349        self,
1350        unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1351    ) -> Self {
1352        Self { unhandled_hook }
1353    }
1354
1355    /// Translate logical filter expression into pruning predicate
1356    /// expression that will evaluate to FALSE if it can be determined no
1357    /// rows between the min/max values could pass the predicates.
1358    ///
1359    /// Any predicates that can not be translated will be passed to `unhandled_hook`.
1360    ///
1361    /// Returns the pruning predicate as an [`PhysicalExpr`]
1362    ///
1363    /// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1364    pub fn rewrite_predicate_to_statistics_predicate(
1365        &self,
1366        expr: &Arc<dyn PhysicalExpr>,
1367        schema: &Schema,
1368    ) -> Arc<dyn PhysicalExpr> {
1369        let mut required_columns = RequiredColumns::new();
1370        build_predicate_expression(
1371            expr,
1372            &Arc::new(schema.clone()),
1373            &mut required_columns,
1374            &self.unhandled_hook,
1375        )
1376    }
1377}
1378
1379/// Translate logical filter expression into pruning predicate
1380/// expression that will evaluate to FALSE if it can be determined no
1381/// rows between the min/max values could pass the predicates.
1382///
1383/// Any predicates that can not be translated will be passed to `unhandled_hook`.
1384///
1385/// Returns the pruning predicate as an [`PhysicalExpr`]
1386///
1387/// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1388fn build_predicate_expression(
1389    expr: &Arc<dyn PhysicalExpr>,
1390    schema: &SchemaRef,
1391    required_columns: &mut RequiredColumns,
1392    unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
1393) -> Arc<dyn PhysicalExpr> {
1394    if is_always_false(expr) {
1395        // Shouldn't return `unhandled_hook.handle(expr)`
1396        // Because it will transfer false to true.
1397        return Arc::clone(expr);
1398    }
1399    // predicate expression can only be a binary expression
1400    let expr_any = expr.as_any();
1401    if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
1402        return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
1403            .unwrap_or_else(|| unhandled_hook.handle(expr));
1404    }
1405    if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
1406        return build_is_null_column_expr(
1407            is_not_null.arg(),
1408            schema,
1409            required_columns,
1410            true,
1411        )
1412        .unwrap_or_else(|| unhandled_hook.handle(expr));
1413    }
1414    if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
1415        return build_single_column_expr(col, schema, required_columns, false)
1416            .unwrap_or_else(|| unhandled_hook.handle(expr));
1417    }
1418    if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() {
1419        // match !col (don't do so recursively)
1420        if let Some(col) = not.arg().as_any().downcast_ref::<phys_expr::Column>() {
1421            return build_single_column_expr(col, schema, required_columns, true)
1422                .unwrap_or_else(|| unhandled_hook.handle(expr));
1423        } else {
1424            return unhandled_hook.handle(expr);
1425        }
1426    }
1427    if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() {
1428        if !in_list.list().is_empty()
1429            && in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
1430        {
1431            let eq_op = if in_list.negated() {
1432                Operator::NotEq
1433            } else {
1434                Operator::Eq
1435            };
1436            let re_op = if in_list.negated() {
1437                Operator::And
1438            } else {
1439                Operator::Or
1440            };
1441            let change_expr = in_list
1442                .list()
1443                .iter()
1444                .map(|e| {
1445                    Arc::new(phys_expr::BinaryExpr::new(
1446                        Arc::clone(in_list.expr()),
1447                        eq_op,
1448                        Arc::clone(e),
1449                    )) as _
1450                })
1451                .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
1452                .unwrap();
1453            return build_predicate_expression(
1454                &change_expr,
1455                schema,
1456                required_columns,
1457                unhandled_hook,
1458            );
1459        } else {
1460            return unhandled_hook.handle(expr);
1461        }
1462    }
1463
1464    let (left, op, right) = {
1465        if let Some(bin_expr) = expr_any.downcast_ref::<phys_expr::BinaryExpr>() {
1466            (
1467                Arc::clone(bin_expr.left()),
1468                *bin_expr.op(),
1469                Arc::clone(bin_expr.right()),
1470            )
1471        } else if let Some(like_expr) = expr_any.downcast_ref::<phys_expr::LikeExpr>() {
1472            if like_expr.case_insensitive() {
1473                return unhandled_hook.handle(expr);
1474            }
1475            let op = match (like_expr.negated(), like_expr.case_insensitive()) {
1476                (false, false) => Operator::LikeMatch,
1477                (true, false) => Operator::NotLikeMatch,
1478                (false, true) => Operator::ILikeMatch,
1479                (true, true) => Operator::NotILikeMatch,
1480            };
1481            (
1482                Arc::clone(like_expr.expr()),
1483                op,
1484                Arc::clone(like_expr.pattern()),
1485            )
1486        } else {
1487            return unhandled_hook.handle(expr);
1488        }
1489    };
1490
1491    if op == Operator::And || op == Operator::Or {
1492        let left_expr =
1493            build_predicate_expression(&left, schema, required_columns, unhandled_hook);
1494        let right_expr =
1495            build_predicate_expression(&right, schema, required_columns, unhandled_hook);
1496        // simplify boolean expression if applicable
1497        let expr = match (&left_expr, op, &right_expr) {
1498            (left, Operator::And, right)
1499                if is_always_false(left) || is_always_false(right) =>
1500            {
1501                Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(false))))
1502            }
1503            (left, Operator::And, _) if is_always_true(left) => right_expr,
1504            (_, Operator::And, right) if is_always_true(right) => left_expr,
1505            (left, Operator::Or, right)
1506                if is_always_true(left) || is_always_true(right) =>
1507            {
1508                Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
1509            }
1510            (left, Operator::Or, _) if is_always_false(left) => right_expr,
1511            (_, Operator::Or, right) if is_always_false(right) => left_expr,
1512
1513            _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
1514        };
1515        return expr;
1516    }
1517
1518    let expr_builder =
1519        PruningExpressionBuilder::try_new(&left, &right, op, schema, required_columns);
1520    let mut expr_builder = match expr_builder {
1521        Ok(builder) => builder,
1522        // allow partial failure in predicate expression generation
1523        // this can still produce a useful predicate when multiple conditions are joined using AND
1524        Err(e) => {
1525            debug!("Error building pruning expression: {e}");
1526            return unhandled_hook.handle(expr);
1527        }
1528    };
1529
1530    build_statistics_expr(&mut expr_builder)
1531        .unwrap_or_else(|_| unhandled_hook.handle(expr))
1532}
1533
1534fn build_statistics_expr(
1535    expr_builder: &mut PruningExpressionBuilder,
1536) -> Result<Arc<dyn PhysicalExpr>> {
1537    let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
1538        Operator::NotEq => {
1539            // column != literal => (min, max) = literal =>
1540            // !(min != literal && max != literal) ==>
1541            // min != literal || literal != max
1542            let min_column_expr = expr_builder.min_column_expr()?;
1543            let max_column_expr = expr_builder.max_column_expr()?;
1544            Arc::new(phys_expr::BinaryExpr::new(
1545                Arc::new(phys_expr::BinaryExpr::new(
1546                    min_column_expr,
1547                    Operator::NotEq,
1548                    Arc::clone(expr_builder.scalar_expr()),
1549                )),
1550                Operator::Or,
1551                Arc::new(phys_expr::BinaryExpr::new(
1552                    Arc::clone(expr_builder.scalar_expr()),
1553                    Operator::NotEq,
1554                    max_column_expr,
1555                )),
1556            ))
1557        }
1558        Operator::Eq => {
1559            // column = literal => (min, max) = literal => min <= literal && literal <= max
1560            // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
1561            let min_column_expr = expr_builder.min_column_expr()?;
1562            let max_column_expr = expr_builder.max_column_expr()?;
1563            Arc::new(phys_expr::BinaryExpr::new(
1564                Arc::new(phys_expr::BinaryExpr::new(
1565                    min_column_expr,
1566                    Operator::LtEq,
1567                    Arc::clone(expr_builder.scalar_expr()),
1568                )),
1569                Operator::And,
1570                Arc::new(phys_expr::BinaryExpr::new(
1571                    Arc::clone(expr_builder.scalar_expr()),
1572                    Operator::LtEq,
1573                    max_column_expr,
1574                )),
1575            ))
1576        }
1577        Operator::NotLikeMatch => build_not_like_match(expr_builder)?,
1578        Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
1579            plan_datafusion_err!(
1580                "LIKE expression with wildcard at the beginning is not supported"
1581            )
1582        })?,
1583        Operator::Gt => {
1584            // column > literal => (min, max) > literal => max > literal
1585            Arc::new(phys_expr::BinaryExpr::new(
1586                expr_builder.max_column_expr()?,
1587                Operator::Gt,
1588                Arc::clone(expr_builder.scalar_expr()),
1589            ))
1590        }
1591        Operator::GtEq => {
1592            // column >= literal => (min, max) >= literal => max >= literal
1593            Arc::new(phys_expr::BinaryExpr::new(
1594                expr_builder.max_column_expr()?,
1595                Operator::GtEq,
1596                Arc::clone(expr_builder.scalar_expr()),
1597            ))
1598        }
1599        Operator::Lt => {
1600            // column < literal => (min, max) < literal => min < literal
1601            Arc::new(phys_expr::BinaryExpr::new(
1602                expr_builder.min_column_expr()?,
1603                Operator::Lt,
1604                Arc::clone(expr_builder.scalar_expr()),
1605            ))
1606        }
1607        Operator::LtEq => {
1608            // column <= literal => (min, max) <= literal => min <= literal
1609            Arc::new(phys_expr::BinaryExpr::new(
1610                expr_builder.min_column_expr()?,
1611                Operator::LtEq,
1612                Arc::clone(expr_builder.scalar_expr()),
1613            ))
1614        }
1615        // other expressions are not supported
1616        _ => {
1617            return plan_err!(
1618                "expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
1619            );
1620        }
1621    };
1622    let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
1623    Ok(statistics_expr)
1624}
1625
1626/// returns the string literal of the scalar value if it is a string
1627fn unpack_string(s: &ScalarValue) -> Option<&str> {
1628    s.try_as_str().flatten()
1629}
1630
1631fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
1632    if let Some(lit) = expr.as_any().downcast_ref::<phys_expr::Literal>() {
1633        let s = unpack_string(lit.value())?;
1634        return Some(s);
1635    }
1636    None
1637}
1638
1639/// Convert `column LIKE literal` where P is a constant prefix of the literal
1640/// to a range check on the column: `P <= column && column < P'`, where P' is the
1641/// lowest string after all P* strings.
1642fn build_like_match(
1643    expr_builder: &mut PruningExpressionBuilder,
1644) -> Option<Arc<dyn PhysicalExpr>> {
1645    // column LIKE literal => (min, max) LIKE literal split at % => min <= split literal && split literal <= max
1646    // column LIKE 'foo%' => min <= 'foo' && 'foo' <= max
1647    // column LIKE '%foo' => min <= '' && '' <= max => true
1648    // column LIKE '%foo%' => min <= '' && '' <= max => true
1649    // column LIKE 'foo' => min <= 'foo' && 'foo' <= max
1650
1651    // TODO Handle ILIKE perhaps by making the min lowercase and max uppercase
1652    //  this may involve building the physical expressions that call lower() and upper()
1653    let min_column_expr = expr_builder.min_column_expr().ok()?;
1654    let max_column_expr = expr_builder.max_column_expr().ok()?;
1655    let scalar_expr = expr_builder.scalar_expr();
1656    // check that the scalar is a string literal
1657    let s = extract_string_literal(scalar_expr)?;
1658    // ANSI SQL specifies two wildcards: % and _. % matches zero or more characters, _ matches exactly one character.
1659    let first_wildcard_index = s.find(['%', '_']);
1660    if first_wildcard_index == Some(0) {
1661        // there's no filtering we could possibly do, return an error and have this be handled by the unhandled hook
1662        return None;
1663    }
1664    let (lower_bound, upper_bound) = if let Some(wildcard_index) = first_wildcard_index {
1665        let prefix = &s[..wildcard_index];
1666        let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1667            prefix.to_string(),
1668        ))));
1669        let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1670            increment_utf8(prefix)?,
1671        ))));
1672        (lower_bound_lit, upper_bound_lit)
1673    } else {
1674        // the like expression is a literal and can be converted into a comparison
1675        let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1676            s.to_string(),
1677        ))));
1678        (Arc::clone(&bound), bound)
1679    };
1680    let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1681        lower_bound,
1682        Operator::LtEq,
1683        Arc::clone(&max_column_expr),
1684    ));
1685    let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1686        Arc::clone(&min_column_expr),
1687        Operator::LtEq,
1688        upper_bound,
1689    ));
1690    let combined = Arc::new(phys_expr::BinaryExpr::new(
1691        upper_bound_expr,
1692        Operator::And,
1693        lower_bound_expr,
1694    ));
1695    Some(combined)
1696}
1697
1698// For predicate `col NOT LIKE 'const_prefix%'`, we rewrite it as `(col_min NOT LIKE 'const_prefix%' OR col_max NOT LIKE 'const_prefix%')`.
1699//
1700// The intuition is that if both `col_min` and `col_max` begin with `const_prefix` that means
1701// **all** data in this row group begins with `const_prefix` as well (and therefore the predicate
1702// looking for rows that don't begin with `const_prefix` can never be true)
1703fn build_not_like_match(
1704    expr_builder: &mut PruningExpressionBuilder<'_>,
1705) -> Result<Arc<dyn PhysicalExpr>> {
1706    // col NOT LIKE 'const_prefix%' -> !(col_min LIKE 'const_prefix%' && col_max LIKE 'const_prefix%') -> (col_min NOT LIKE 'const_prefix%' || col_max NOT LIKE 'const_prefix%')
1707
1708    let min_column_expr = expr_builder.min_column_expr()?;
1709    let max_column_expr = expr_builder.max_column_expr()?;
1710
1711    let scalar_expr = expr_builder.scalar_expr();
1712
1713    let pattern = extract_string_literal(scalar_expr).ok_or_else(|| {
1714        plan_datafusion_err!("cannot extract literal from NOT LIKE expression")
1715    })?;
1716
1717    let (const_prefix, remaining) = split_constant_prefix(pattern);
1718    if const_prefix.is_empty() || remaining != "%" {
1719        // we can not handle `%` at the beginning or in the middle of the pattern
1720        // Example: For pattern "foo%bar", the row group might include values like
1721        // ["foobar", "food", "foodbar"], making it unsafe to prune.
1722        // Even if the min/max values in the group (e.g., "foobar" and "foodbar")
1723        // match the pattern, intermediate values like "food" may not
1724        // match the full pattern "foo%bar", making pruning unsafe.
1725        // (truncate foo%bar to foo% have same problem)
1726
1727        // we can not handle pattern containing `_`
1728        // Example: For pattern "foo_", row groups might contain ["fooa", "fooaa", "foob"],
1729        // which means not every row is guaranteed to match the pattern.
1730        return Err(plan_datafusion_err!(
1731            "NOT LIKE expressions only support constant_prefix+wildcard`%`"
1732        ));
1733    }
1734
1735    let min_col_not_like_epxr = Arc::new(phys_expr::LikeExpr::new(
1736        true,
1737        false,
1738        Arc::clone(&min_column_expr),
1739        Arc::clone(scalar_expr),
1740    ));
1741
1742    let max_col_not_like_expr = Arc::new(phys_expr::LikeExpr::new(
1743        true,
1744        false,
1745        Arc::clone(&max_column_expr),
1746        Arc::clone(scalar_expr),
1747    ));
1748
1749    Ok(Arc::new(phys_expr::BinaryExpr::new(
1750        min_col_not_like_epxr,
1751        Operator::Or,
1752        max_col_not_like_expr,
1753    )))
1754}
1755
1756/// Returns unescaped constant prefix of a LIKE pattern (possibly empty) and the remaining pattern (possibly empty)
1757fn split_constant_prefix(pattern: &str) -> (&str, &str) {
1758    let char_indices = pattern.char_indices().collect::<Vec<_>>();
1759    for i in 0..char_indices.len() {
1760        let (idx, char) = char_indices[i];
1761        if char == '%' || char == '_' {
1762            if i != 0 && char_indices[i - 1].1 == '\\' {
1763                // ecsaped by `\`
1764                continue;
1765            }
1766            return (&pattern[..idx], &pattern[idx..]);
1767        }
1768    }
1769    (pattern, "")
1770}
1771
1772/// Increment a UTF8 string by one, returning `None` if it can't be incremented.
1773/// This makes it so that the returned string will always compare greater than the input string
1774/// or any other string with the same prefix.
1775/// This is necessary since the statistics may have been truncated: if we have a min statistic
1776/// of "fo" that may have originally been "foz" or anything else with the prefix "fo".
1777/// E.g. `increment_utf8("foo") >= "foo"` and `increment_utf8("foo") >= "fooz"`
1778/// In this example `increment_utf8("foo") == "fop"
1779fn increment_utf8(data: &str) -> Option<String> {
1780    // Helper function to check if a character is valid to use
1781    fn is_valid_unicode(c: char) -> bool {
1782        let cp = c as u32;
1783
1784        // Filter out non-characters (https://www.unicode.org/versions/corrigendum9.html)
1785        if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
1786            return false;
1787        }
1788
1789        // Filter out private use area
1790        if cp >= 0x110000 {
1791            return false;
1792        }
1793
1794        true
1795    }
1796
1797    // Convert string to vector of code points
1798    let mut code_points: Vec<char> = data.chars().collect();
1799
1800    // Work backwards through code points
1801    for idx in (0..code_points.len()).rev() {
1802        let original = code_points[idx] as u32;
1803
1804        // Try incrementing the code point
1805        if let Some(next_char) = char::from_u32(original + 1) {
1806            if is_valid_unicode(next_char) {
1807                code_points[idx] = next_char;
1808                // truncate the string to the current index
1809                code_points.truncate(idx + 1);
1810                return Some(code_points.into_iter().collect());
1811            }
1812        }
1813    }
1814
1815    None
1816}
1817
1818/// Wrap the statistics expression in a check that skips the expression if the column is all nulls.
1819///
1820/// This is important not only as an optimization but also because statistics may not be
1821/// accurate for columns that are all nulls.
1822/// For example, for an `int` column `x` with all nulls, the min/max/null_count statistics
1823/// might be set to 0 and evaluating `x = 0` would incorrectly include the column.
1824///
1825/// For example:
1826///
1827/// `x_min <= 10 AND 10 <= x_max`
1828///
1829/// will become
1830///
1831/// ```sql
1832/// x_null_count != x_row_count AND (x_min <= 10 AND 10 <= x_max)
1833/// ````
1834///
1835/// If the column is known to be all nulls, then the expression
1836/// `x_null_count = x_row_count` will be true, which will cause the
1837/// boolean expression to return false. Therefore, prune out the container.
1838fn wrap_null_count_check_expr(
1839    statistics_expr: Arc<dyn PhysicalExpr>,
1840    expr_builder: &mut PruningExpressionBuilder,
1841) -> Result<Arc<dyn PhysicalExpr>> {
1842    // x_null_count != x_row_count
1843    let not_when_null_count_eq_row_count = Arc::new(phys_expr::BinaryExpr::new(
1844        expr_builder.null_count_column_expr()?,
1845        Operator::NotEq,
1846        expr_builder.row_count_column_expr()?,
1847    ));
1848
1849    // (x_null_count != x_row_count) AND (<statistics_expr>)
1850    Ok(Arc::new(phys_expr::BinaryExpr::new(
1851        not_when_null_count_eq_row_count,
1852        Operator::And,
1853        statistics_expr,
1854    )))
1855}
1856
1857#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1858pub(crate) enum StatisticsType {
1859    Min,
1860    Max,
1861    NullCount,
1862    RowCount,
1863}
1864
1865#[cfg(test)]
1866mod tests {
1867    use std::collections::HashMap;
1868    use std::ops::{Not, Rem};
1869
1870    use super::*;
1871    use datafusion_common::test_util::batches_to_string;
1872    use datafusion_expr::{and, col, lit, or};
1873    use insta::assert_snapshot;
1874
1875    use arrow::array::Decimal128Array;
1876    use arrow::{
1877        array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
1878        datatypes::TimeUnit,
1879    };
1880    use datafusion_expr::expr::InList;
1881    use datafusion_expr::{cast, is_null, try_cast, Expr};
1882    use datafusion_functions_nested::expr_fn::{array_has, make_array};
1883    use datafusion_physical_expr::expressions as phys_expr;
1884    use datafusion_physical_expr::planner::logical2physical;
1885
1886    #[derive(Debug, Default)]
1887    /// Mock statistic provider for tests
1888    ///
1889    /// Each row represents the statistics for a "container" (which
1890    /// might represent an entire parquet file, or directory of files,
1891    /// or some other collection of data for which we had statistics)
1892    ///
1893    /// Note All `ArrayRefs` must be the same size.
1894    struct ContainerStats {
1895        min: Option<ArrayRef>,
1896        max: Option<ArrayRef>,
1897        /// Optional values
1898        null_counts: Option<ArrayRef>,
1899        row_counts: Option<ArrayRef>,
1900        /// Optional known values (e.g. mimic a bloom filter)
1901        /// (value, contained)
1902        /// If present, all BooleanArrays must be the same size as min/max
1903        contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
1904    }
1905
1906    impl ContainerStats {
1907        fn new() -> Self {
1908            Default::default()
1909        }
1910        fn new_decimal128(
1911            min: impl IntoIterator<Item = Option<i128>>,
1912            max: impl IntoIterator<Item = Option<i128>>,
1913            precision: u8,
1914            scale: i8,
1915        ) -> Self {
1916            Self::new()
1917                .with_min(Arc::new(
1918                    min.into_iter()
1919                        .collect::<Decimal128Array>()
1920                        .with_precision_and_scale(precision, scale)
1921                        .unwrap(),
1922                ))
1923                .with_max(Arc::new(
1924                    max.into_iter()
1925                        .collect::<Decimal128Array>()
1926                        .with_precision_and_scale(precision, scale)
1927                        .unwrap(),
1928                ))
1929        }
1930
1931        fn new_i64(
1932            min: impl IntoIterator<Item = Option<i64>>,
1933            max: impl IntoIterator<Item = Option<i64>>,
1934        ) -> Self {
1935            Self::new()
1936                .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
1937                .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
1938        }
1939
1940        fn new_i32(
1941            min: impl IntoIterator<Item = Option<i32>>,
1942            max: impl IntoIterator<Item = Option<i32>>,
1943        ) -> Self {
1944            Self::new()
1945                .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
1946                .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
1947        }
1948
1949        fn new_utf8<'a>(
1950            min: impl IntoIterator<Item = Option<&'a str>>,
1951            max: impl IntoIterator<Item = Option<&'a str>>,
1952        ) -> Self {
1953            Self::new()
1954                .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
1955                .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
1956        }
1957
1958        fn new_bool(
1959            min: impl IntoIterator<Item = Option<bool>>,
1960            max: impl IntoIterator<Item = Option<bool>>,
1961        ) -> Self {
1962            Self::new()
1963                .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
1964                .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
1965        }
1966
1967        fn min(&self) -> Option<ArrayRef> {
1968            self.min.clone()
1969        }
1970
1971        fn max(&self) -> Option<ArrayRef> {
1972            self.max.clone()
1973        }
1974
1975        fn null_counts(&self) -> Option<ArrayRef> {
1976            self.null_counts.clone()
1977        }
1978
1979        fn row_counts(&self) -> Option<ArrayRef> {
1980            self.row_counts.clone()
1981        }
1982
1983        /// return an iterator over all arrays in this statistics
1984        fn arrays(&self) -> Vec<ArrayRef> {
1985            let contained_arrays = self
1986                .contained
1987                .iter()
1988                .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
1989
1990            [
1991                self.min.as_ref().cloned(),
1992                self.max.as_ref().cloned(),
1993                self.null_counts.as_ref().cloned(),
1994                self.row_counts.as_ref().cloned(),
1995            ]
1996            .into_iter()
1997            .flatten()
1998            .chain(contained_arrays)
1999            .collect()
2000        }
2001
2002        /// Returns the number of containers represented by this statistics This
2003        /// picks the length of the first array as all arrays must have the same
2004        /// length (which is verified by `assert_invariants`).
2005        fn len(&self) -> usize {
2006            // pick the first non zero length
2007            self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
2008        }
2009
2010        /// Ensure that the lengths of all arrays are consistent
2011        fn assert_invariants(&self) {
2012            let mut prev_len = None;
2013
2014            for len in self.arrays().iter().map(|a| a.len()) {
2015                // Get a length, if we don't already have one
2016                match prev_len {
2017                    None => {
2018                        prev_len = Some(len);
2019                    }
2020                    Some(prev_len) => {
2021                        assert_eq!(prev_len, len);
2022                    }
2023                }
2024            }
2025        }
2026
2027        /// Add min values
2028        fn with_min(mut self, min: ArrayRef) -> Self {
2029            self.min = Some(min);
2030            self
2031        }
2032
2033        /// Add max values
2034        fn with_max(mut self, max: ArrayRef) -> Self {
2035            self.max = Some(max);
2036            self
2037        }
2038
2039        /// Add null counts. There must be the same number of null counts as
2040        /// there are containers
2041        fn with_null_counts(
2042            mut self,
2043            counts: impl IntoIterator<Item = Option<u64>>,
2044        ) -> Self {
2045            let null_counts: ArrayRef =
2046                Arc::new(counts.into_iter().collect::<UInt64Array>());
2047
2048            self.assert_invariants();
2049            self.null_counts = Some(null_counts);
2050            self
2051        }
2052
2053        /// Add row counts. There must be the same number of row counts as
2054        /// there are containers
2055        fn with_row_counts(
2056            mut self,
2057            counts: impl IntoIterator<Item = Option<u64>>,
2058        ) -> Self {
2059            let row_counts: ArrayRef =
2060                Arc::new(counts.into_iter().collect::<UInt64Array>());
2061
2062            self.assert_invariants();
2063            self.row_counts = Some(row_counts);
2064            self
2065        }
2066
2067        /// Add contained information.
2068        pub fn with_contained(
2069            mut self,
2070            values: impl IntoIterator<Item = ScalarValue>,
2071            contained: impl IntoIterator<Item = Option<bool>>,
2072        ) -> Self {
2073            let contained: BooleanArray = contained.into_iter().collect();
2074            let values: HashSet<_> = values.into_iter().collect();
2075
2076            self.contained.push((values, contained));
2077            self.assert_invariants();
2078            self
2079        }
2080
2081        /// get any contained information for the specified values
2082        fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
2083            // find the one with the matching values
2084            self.contained
2085                .iter()
2086                .find(|(values, _contained)| values == find_values)
2087                .map(|(_values, contained)| contained.clone())
2088        }
2089    }
2090
2091    #[derive(Debug, Default)]
2092    struct TestStatistics {
2093        // key: column name
2094        stats: HashMap<Column, ContainerStats>,
2095    }
2096
2097    impl TestStatistics {
2098        fn new() -> Self {
2099            Self::default()
2100        }
2101
2102        fn with(
2103            mut self,
2104            name: impl Into<String>,
2105            container_stats: ContainerStats,
2106        ) -> Self {
2107            let col = Column::from_name(name.into());
2108            self.stats.insert(col, container_stats);
2109            self
2110        }
2111
2112        /// Add null counts for the specified column.
2113        /// There must be the same number of null counts as
2114        /// there are containers
2115        fn with_null_counts(
2116            mut self,
2117            name: impl Into<String>,
2118            counts: impl IntoIterator<Item = Option<u64>>,
2119        ) -> Self {
2120            let col = Column::from_name(name.into());
2121
2122            // take stats out and update them
2123            let container_stats = self
2124                .stats
2125                .remove(&col)
2126                .unwrap_or_default()
2127                .with_null_counts(counts);
2128
2129            // put stats back in
2130            self.stats.insert(col, container_stats);
2131            self
2132        }
2133
2134        /// Add row counts for the specified column.
2135        /// There must be the same number of row counts as
2136        /// there are containers
2137        fn with_row_counts(
2138            mut self,
2139            name: impl Into<String>,
2140            counts: impl IntoIterator<Item = Option<u64>>,
2141        ) -> Self {
2142            let col = Column::from_name(name.into());
2143
2144            // take stats out and update them
2145            let container_stats = self
2146                .stats
2147                .remove(&col)
2148                .unwrap_or_default()
2149                .with_row_counts(counts);
2150
2151            // put stats back in
2152            self.stats.insert(col, container_stats);
2153            self
2154        }
2155
2156        /// Add contained information for the specified column.
2157        fn with_contained(
2158            mut self,
2159            name: impl Into<String>,
2160            values: impl IntoIterator<Item = ScalarValue>,
2161            contained: impl IntoIterator<Item = Option<bool>>,
2162        ) -> Self {
2163            let col = Column::from_name(name.into());
2164
2165            // take stats out and update them
2166            let container_stats = self
2167                .stats
2168                .remove(&col)
2169                .unwrap_or_default()
2170                .with_contained(values, contained);
2171
2172            // put stats back in
2173            self.stats.insert(col, container_stats);
2174            self
2175        }
2176    }
2177
2178    impl PruningStatistics for TestStatistics {
2179        fn min_values(&self, column: &Column) -> Option<ArrayRef> {
2180            self.stats
2181                .get(column)
2182                .map(|container_stats| container_stats.min())
2183                .unwrap_or(None)
2184        }
2185
2186        fn max_values(&self, column: &Column) -> Option<ArrayRef> {
2187            self.stats
2188                .get(column)
2189                .map(|container_stats| container_stats.max())
2190                .unwrap_or(None)
2191        }
2192
2193        fn num_containers(&self) -> usize {
2194            self.stats
2195                .values()
2196                .next()
2197                .map(|container_stats| container_stats.len())
2198                .unwrap_or(0)
2199        }
2200
2201        fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
2202            self.stats
2203                .get(column)
2204                .map(|container_stats| container_stats.null_counts())
2205                .unwrap_or(None)
2206        }
2207
2208        fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
2209            self.stats
2210                .get(column)
2211                .map(|container_stats| container_stats.row_counts())
2212                .unwrap_or(None)
2213        }
2214
2215        fn contained(
2216            &self,
2217            column: &Column,
2218            values: &HashSet<ScalarValue>,
2219        ) -> Option<BooleanArray> {
2220            self.stats
2221                .get(column)
2222                .and_then(|container_stats| container_stats.contained(values))
2223        }
2224    }
2225
2226    /// Returns the specified min/max container values
2227    struct OneContainerStats {
2228        min_values: Option<ArrayRef>,
2229        max_values: Option<ArrayRef>,
2230        num_containers: usize,
2231    }
2232
2233    impl PruningStatistics for OneContainerStats {
2234        fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
2235            self.min_values.clone()
2236        }
2237
2238        fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
2239            self.max_values.clone()
2240        }
2241
2242        fn num_containers(&self) -> usize {
2243            self.num_containers
2244        }
2245
2246        fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
2247            None
2248        }
2249
2250        fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
2251            None
2252        }
2253
2254        fn contained(
2255            &self,
2256            _column: &Column,
2257            _values: &HashSet<ScalarValue>,
2258        ) -> Option<BooleanArray> {
2259            None
2260        }
2261    }
2262
2263    /// Row count should only be referenced once in the pruning expression, even if we need the row count
2264    /// for multiple columns.
2265    #[test]
2266    fn test_unique_row_count_field_and_column() {
2267        // c1 = 100 AND c2 = 200
2268        let schema: SchemaRef = Arc::new(Schema::new(vec![
2269            Field::new("c1", DataType::Int32, true),
2270            Field::new("c2", DataType::Int32, true),
2271        ]));
2272        let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2273        let expr = logical2physical(&expr, &schema);
2274        let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2275        // note pruning expression refers to row_count twice
2276        assert_eq!(
2277            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2278            p.predicate_expr.to_string()
2279        );
2280
2281        // Fields in required schema should be unique, otherwise when creating batches
2282        // it will fail because of duplicate field names
2283        let mut fields = HashSet::new();
2284        for (_col, _ty, field) in p.required_columns().iter() {
2285            let was_new = fields.insert(field);
2286            if !was_new {
2287                panic!(
2288                    "Duplicate field in required schema: {field:?}. Previous fields:\n{fields:#?}"
2289                );
2290            }
2291        }
2292    }
2293
2294    #[test]
2295    fn prune_all_rows_null_counts() {
2296        // if null_count = row_count then we should prune the container for i = 0
2297        // regardless of the statistics
2298        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2299        let statistics = TestStatistics::new().with(
2300            "i",
2301            ContainerStats::new_i32(
2302                vec![Some(0)], // min
2303                vec![Some(0)], // max
2304            )
2305            .with_null_counts(vec![Some(1)])
2306            .with_row_counts(vec![Some(1)]),
2307        );
2308        let expected_ret = &[false];
2309        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2310
2311        // this should be true even if the container stats are missing
2312        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2313        let container_stats = ContainerStats {
2314            min: Some(Arc::new(Int32Array::from(vec![None]))),
2315            max: Some(Arc::new(Int32Array::from(vec![None]))),
2316            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2317            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2318            ..ContainerStats::default()
2319        };
2320        let statistics = TestStatistics::new().with("i", container_stats);
2321        let expected_ret = &[false];
2322        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2323
2324        // If the null counts themselves are missing we should be able to fall back to the stats
2325        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2326        let container_stats = ContainerStats {
2327            min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2328            max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2329            null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2330            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2331            ..ContainerStats::default()
2332        };
2333        let statistics = TestStatistics::new().with("i", container_stats);
2334        let expected_ret = &[true];
2335        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2336        let expected_ret = &[false];
2337        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2338
2339        // Same for the row counts
2340        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2341        let container_stats = ContainerStats {
2342            min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2343            max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2344            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2345            row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2346            ..ContainerStats::default()
2347        };
2348        let statistics = TestStatistics::new().with("i", container_stats);
2349        let expected_ret = &[true];
2350        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2351        let expected_ret = &[false];
2352        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2353    }
2354
2355    #[test]
2356    fn prune_missing_statistics() {
2357        // If the min or max stats are missing we should not prune
2358        // (unless we know all rows are null, see `prune_all_rows_null_counts`)
2359        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2360        let container_stats = ContainerStats {
2361            min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
2362            max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
2363            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
2364            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
2365            ..ContainerStats::default()
2366        };
2367        let statistics = TestStatistics::new().with("i", container_stats);
2368        let expected_ret = &[true, true];
2369        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2370        let expected_ret = &[false, true];
2371        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2372        let expected_ret = &[true, false];
2373        prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
2374    }
2375
2376    #[test]
2377    fn prune_null_stats() {
2378        // if null_count = row_count then we should prune the container for i = 0
2379        // regardless of the statistics
2380        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2381
2382        let statistics = TestStatistics::new().with(
2383            "i",
2384            ContainerStats::new_i32(
2385                vec![Some(0)], // min
2386                vec![Some(0)], // max
2387            )
2388            .with_null_counts(vec![Some(1)])
2389            .with_row_counts(vec![Some(1)]),
2390        );
2391
2392        let expected_ret = &[false];
2393
2394        // i = 0
2395        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2396    }
2397
2398    #[test]
2399    fn test_build_statistics_record_batch() {
2400        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
2401        let required_columns = RequiredColumns::from(vec![
2402            // min of original column s1, named s1_min
2403            (
2404                phys_expr::Column::new("s1", 1),
2405                StatisticsType::Min,
2406                Field::new("s1_min", DataType::Int32, true),
2407            ),
2408            // max of original column s2, named s2_max
2409            (
2410                phys_expr::Column::new("s2", 2),
2411                StatisticsType::Max,
2412                Field::new("s2_max", DataType::Int32, true),
2413            ),
2414            // max of original column s3, named s3_max
2415            (
2416                phys_expr::Column::new("s3", 3),
2417                StatisticsType::Max,
2418                Field::new("s3_max", DataType::Utf8, true),
2419            ),
2420            // min of original column s3, named s3_min
2421            (
2422                phys_expr::Column::new("s3", 3),
2423                StatisticsType::Min,
2424                Field::new("s3_min", DataType::Utf8, true),
2425            ),
2426        ]);
2427
2428        let statistics = TestStatistics::new()
2429            .with(
2430                "s1",
2431                ContainerStats::new_i32(
2432                    vec![None, None, Some(9), None],  // min
2433                    vec![Some(10), None, None, None], // max
2434                ),
2435            )
2436            .with(
2437                "s2",
2438                ContainerStats::new_i32(
2439                    vec![Some(2), None, None, None],  // min
2440                    vec![Some(20), None, None, None], // max
2441                ),
2442            )
2443            .with(
2444                "s3",
2445                ContainerStats::new_utf8(
2446                    vec![Some("a"), None, None, None],      // min
2447                    vec![Some("q"), None, Some("r"), None], // max
2448                ),
2449            );
2450
2451        let batch =
2452            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2453        assert_snapshot!(batches_to_string(&[batch]), @r"
2454        +--------+--------+--------+--------+
2455        | s1_min | s2_max | s3_max | s3_min |
2456        +--------+--------+--------+--------+
2457        |        | 20     | q      | a      |
2458        |        |        |        |        |
2459        | 9      |        | r      |        |
2460        |        |        |        |        |
2461        +--------+--------+--------+--------+
2462        ");
2463    }
2464
2465    #[test]
2466    fn test_build_statistics_casting() {
2467        // Test requesting a Timestamp column, but getting statistics as Int64
2468        // which is what Parquet does
2469
2470        // Request a record batch with of s1_min as a timestamp
2471        let required_columns = RequiredColumns::from(vec![(
2472            phys_expr::Column::new("s3", 3),
2473            StatisticsType::Min,
2474            Field::new(
2475                "s1_min",
2476                DataType::Timestamp(TimeUnit::Nanosecond, None),
2477                true,
2478            ),
2479        )]);
2480
2481        // Note the statistics pass back i64 (not timestamp)
2482        let statistics = OneContainerStats {
2483            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2484            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2485            num_containers: 1,
2486        };
2487
2488        let batch =
2489            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2490
2491        assert_snapshot!(batches_to_string(&[batch]), @r"
2492        +-------------------------------+
2493        | s1_min                        |
2494        +-------------------------------+
2495        | 1970-01-01T00:00:00.000000010 |
2496        +-------------------------------+
2497        ");
2498    }
2499
2500    #[test]
2501    fn test_build_statistics_no_required_stats() {
2502        let required_columns = RequiredColumns::new();
2503
2504        let statistics = OneContainerStats {
2505            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2506            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2507            num_containers: 1,
2508        };
2509
2510        let batch =
2511            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2512        assert_eq!(batch.num_rows(), 1); // had 1 container
2513    }
2514
2515    #[test]
2516    fn test_build_statistics_inconsistent_types() {
2517        // Test requesting a Utf8 column when the stats return some other type
2518
2519        // Request a record batch with of s1_min as a timestamp
2520        let required_columns = RequiredColumns::from(vec![(
2521            phys_expr::Column::new("s3", 3),
2522            StatisticsType::Min,
2523            Field::new("s1_min", DataType::Utf8, true),
2524        )]);
2525
2526        // Note the statistics return an invalid UTF-8 sequence which will be converted to null
2527        let statistics = OneContainerStats {
2528            min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
2529            max_values: None,
2530            num_containers: 1,
2531        };
2532
2533        let batch =
2534            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2535        assert_snapshot!(batches_to_string(&[batch]), @r"
2536        +--------+
2537        | s1_min |
2538        +--------+
2539        |        |
2540        +--------+
2541        ");
2542    }
2543
2544    #[test]
2545    fn test_build_statistics_inconsistent_length() {
2546        // return an inconsistent length to the actual statistics arrays
2547        let required_columns = RequiredColumns::from(vec![(
2548            phys_expr::Column::new("s1", 3),
2549            StatisticsType::Min,
2550            Field::new("s1_min", DataType::Int64, true),
2551        )]);
2552
2553        // Note the statistics pass back i64 (not timestamp)
2554        let statistics = OneContainerStats {
2555            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2556            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2557            num_containers: 3,
2558        };
2559
2560        let result =
2561            build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
2562        assert!(
2563            result
2564                .to_string()
2565                .contains("mismatched statistics length. Expected 3, got 1"),
2566            "{}",
2567            result
2568        );
2569    }
2570
2571    #[test]
2572    fn row_group_predicate_eq() -> Result<()> {
2573        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2574        let expected_expr =
2575            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2576
2577        // test column on the left
2578        let expr = col("c1").eq(lit(1));
2579        let predicate_expr =
2580            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2581        assert_eq!(predicate_expr.to_string(), expected_expr);
2582
2583        // test column on the right
2584        let expr = lit(1).eq(col("c1"));
2585        let predicate_expr =
2586            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2587        assert_eq!(predicate_expr.to_string(), expected_expr);
2588
2589        Ok(())
2590    }
2591
2592    #[test]
2593    fn row_group_predicate_not_eq() -> Result<()> {
2594        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2595        let expected_expr =
2596            "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2597
2598        // test column on the left
2599        let expr = col("c1").not_eq(lit(1));
2600        let predicate_expr =
2601            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2602        assert_eq!(predicate_expr.to_string(), expected_expr);
2603
2604        // test column on the right
2605        let expr = lit(1).not_eq(col("c1"));
2606        let predicate_expr =
2607            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2608        assert_eq!(predicate_expr.to_string(), expected_expr);
2609
2610        Ok(())
2611    }
2612
2613    #[test]
2614    fn row_group_predicate_gt() -> Result<()> {
2615        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2616        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
2617
2618        // test column on the left
2619        let expr = col("c1").gt(lit(1));
2620        let predicate_expr =
2621            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2622        assert_eq!(predicate_expr.to_string(), expected_expr);
2623
2624        // test column on the right
2625        let expr = lit(1).lt(col("c1"));
2626        let predicate_expr =
2627            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2628        assert_eq!(predicate_expr.to_string(), expected_expr);
2629
2630        Ok(())
2631    }
2632
2633    #[test]
2634    fn row_group_predicate_gt_eq() -> Result<()> {
2635        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2636        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
2637
2638        // test column on the left
2639        let expr = col("c1").gt_eq(lit(1));
2640        let predicate_expr =
2641            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2642        assert_eq!(predicate_expr.to_string(), expected_expr);
2643        // test column on the right
2644        let expr = lit(1).lt_eq(col("c1"));
2645        let predicate_expr =
2646            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2647        assert_eq!(predicate_expr.to_string(), expected_expr);
2648
2649        Ok(())
2650    }
2651
2652    #[test]
2653    fn row_group_predicate_lt() -> Result<()> {
2654        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2655        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2656
2657        // test column on the left
2658        let expr = col("c1").lt(lit(1));
2659        let predicate_expr =
2660            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2661        assert_eq!(predicate_expr.to_string(), expected_expr);
2662
2663        // test column on the right
2664        let expr = lit(1).gt(col("c1"));
2665        let predicate_expr =
2666            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2667        assert_eq!(predicate_expr.to_string(), expected_expr);
2668
2669        Ok(())
2670    }
2671
2672    #[test]
2673    fn row_group_predicate_lt_eq() -> Result<()> {
2674        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2675        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
2676
2677        // test column on the left
2678        let expr = col("c1").lt_eq(lit(1));
2679        let predicate_expr =
2680            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2681        assert_eq!(predicate_expr.to_string(), expected_expr);
2682        // test column on the right
2683        let expr = lit(1).gt_eq(col("c1"));
2684        let predicate_expr =
2685            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2686        assert_eq!(predicate_expr.to_string(), expected_expr);
2687
2688        Ok(())
2689    }
2690
2691    #[test]
2692    fn row_group_predicate_and() -> Result<()> {
2693        let schema = Schema::new(vec![
2694            Field::new("c1", DataType::Int32, false),
2695            Field::new("c2", DataType::Int32, false),
2696            Field::new("c3", DataType::Int32, false),
2697        ]);
2698        // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression
2699        let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2700        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2701        let predicate_expr =
2702            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2703        assert_eq!(predicate_expr.to_string(), expected_expr);
2704
2705        Ok(())
2706    }
2707
2708    #[test]
2709    fn row_group_predicate_or() -> Result<()> {
2710        let schema = Schema::new(vec![
2711            Field::new("c1", DataType::Int32, false),
2712            Field::new("c2", DataType::Int32, false),
2713        ]);
2714        // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 = 0 expression
2715        let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
2716        let expected_expr = "true";
2717        let predicate_expr =
2718            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2719        assert_eq!(predicate_expr.to_string(), expected_expr);
2720
2721        Ok(())
2722    }
2723
2724    #[test]
2725    fn row_group_predicate_not() -> Result<()> {
2726        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2727        let expected_expr = "true";
2728
2729        let expr = col("c1").not();
2730        let predicate_expr =
2731            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2732        assert_eq!(predicate_expr.to_string(), expected_expr);
2733
2734        Ok(())
2735    }
2736
2737    #[test]
2738    fn row_group_predicate_not_bool() -> Result<()> {
2739        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2740        let expected_expr = "NOT c1_min@0 AND c1_max@1";
2741
2742        let expr = col("c1").not();
2743        let predicate_expr =
2744            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2745        assert_eq!(predicate_expr.to_string(), expected_expr);
2746
2747        Ok(())
2748    }
2749
2750    #[test]
2751    fn row_group_predicate_bool() -> Result<()> {
2752        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2753        let expected_expr = "c1_min@0 OR c1_max@1";
2754
2755        let expr = col("c1");
2756        let predicate_expr =
2757            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2758        assert_eq!(predicate_expr.to_string(), expected_expr);
2759
2760        Ok(())
2761    }
2762
2763    #[test]
2764    fn row_group_predicate_lt_bool() -> Result<()> {
2765        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2766        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
2767
2768        // DF doesn't support arithmetic on boolean columns so
2769        // this predicate will error when evaluated
2770        let expr = col("c1").lt(lit(true));
2771        let predicate_expr =
2772            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2773        assert_eq!(predicate_expr.to_string(), expected_expr);
2774
2775        Ok(())
2776    }
2777
2778    #[test]
2779    fn row_group_predicate_required_columns() -> Result<()> {
2780        let schema = Schema::new(vec![
2781            Field::new("c1", DataType::Int32, false),
2782            Field::new("c2", DataType::Int32, false),
2783        ]);
2784        let mut required_columns = RequiredColumns::new();
2785        // c1 < 1 and (c2 = 2 or c2 = 3)
2786        let expr = col("c1")
2787            .lt(lit(1))
2788            .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
2789        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
2790        let predicate_expr =
2791            test_build_predicate_expression(&expr, &schema, &mut required_columns);
2792        assert_eq!(predicate_expr.to_string(), expected_expr);
2793        println!("required_columns: {required_columns:#?}"); // for debugging assertions below
2794                                                             // c1 < 1 should add c1_min
2795        let c1_min_field = Field::new("c1_min", DataType::Int32, false);
2796        assert_eq!(
2797            required_columns.columns[0],
2798            (
2799                phys_expr::Column::new("c1", 0),
2800                StatisticsType::Min,
2801                c1_min_field.with_nullable(true) // could be nullable if stats are not present
2802            )
2803        );
2804        // c1 < 1 should add c1_null_count
2805        let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
2806        assert_eq!(
2807            required_columns.columns[1],
2808            (
2809                phys_expr::Column::new("c1", 0),
2810                StatisticsType::NullCount,
2811                c1_null_count_field.with_nullable(true) // could be nullable if stats are not present
2812            )
2813        );
2814        // c1 < 1 should add row_count
2815        let row_count_field = Field::new("row_count", DataType::UInt64, false);
2816        assert_eq!(
2817            required_columns.columns[2],
2818            (
2819                phys_expr::Column::new("c1", 0),
2820                StatisticsType::RowCount,
2821                row_count_field.with_nullable(true) // could be nullable if stats are not present
2822            )
2823        );
2824        // c2 = 2 should add c2_min and c2_max
2825        let c2_min_field = Field::new("c2_min", DataType::Int32, false);
2826        assert_eq!(
2827            required_columns.columns[3],
2828            (
2829                phys_expr::Column::new("c2", 1),
2830                StatisticsType::Min,
2831                c2_min_field.with_nullable(true) // could be nullable if stats are not present
2832            )
2833        );
2834        let c2_max_field = Field::new("c2_max", DataType::Int32, false);
2835        assert_eq!(
2836            required_columns.columns[4],
2837            (
2838                phys_expr::Column::new("c2", 1),
2839                StatisticsType::Max,
2840                c2_max_field.with_nullable(true) // could be nullable if stats are not present
2841            )
2842        );
2843        // c2 = 2 should add c2_null_count
2844        let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
2845        assert_eq!(
2846            required_columns.columns[5],
2847            (
2848                phys_expr::Column::new("c2", 1),
2849                StatisticsType::NullCount,
2850                c2_null_count_field.with_nullable(true) // could be nullable if stats are not present
2851            )
2852        );
2853        // c2 = 1 should add row_count
2854        let row_count_field = Field::new("row_count", DataType::UInt64, false);
2855        assert_eq!(
2856            required_columns.columns[2],
2857            (
2858                phys_expr::Column::new("c1", 0),
2859                StatisticsType::RowCount,
2860                row_count_field.with_nullable(true) // could be nullable if stats are not present
2861            )
2862        );
2863        // c2 = 3 shouldn't add any new statistics fields
2864        assert_eq!(required_columns.columns.len(), 6);
2865
2866        Ok(())
2867    }
2868
2869    #[test]
2870    fn row_group_predicate_in_list() -> Result<()> {
2871        let schema = Schema::new(vec![
2872            Field::new("c1", DataType::Int32, false),
2873            Field::new("c2", DataType::Int32, false),
2874        ]);
2875        // test c1 in(1, 2, 3)
2876        let expr = Expr::InList(InList::new(
2877            Box::new(col("c1")),
2878            vec![lit(1), lit(2), lit(3)],
2879            false,
2880        ));
2881        let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
2882        let predicate_expr =
2883            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2884        assert_eq!(predicate_expr.to_string(), expected_expr);
2885
2886        Ok(())
2887    }
2888
2889    #[test]
2890    fn row_group_predicate_in_list_empty() -> Result<()> {
2891        let schema = Schema::new(vec![
2892            Field::new("c1", DataType::Int32, false),
2893            Field::new("c2", DataType::Int32, false),
2894        ]);
2895        // test c1 in()
2896        let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
2897        let expected_expr = "true";
2898        let predicate_expr =
2899            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2900        assert_eq!(predicate_expr.to_string(), expected_expr);
2901
2902        Ok(())
2903    }
2904
2905    #[test]
2906    fn row_group_predicate_in_list_negated() -> Result<()> {
2907        let schema = Schema::new(vec![
2908            Field::new("c1", DataType::Int32, false),
2909            Field::new("c2", DataType::Int32, false),
2910        ]);
2911        // test c1 not in(1, 2, 3)
2912        let expr = Expr::InList(InList::new(
2913            Box::new(col("c1")),
2914            vec![lit(1), lit(2), lit(3)],
2915            true,
2916        ));
2917        let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
2918        let predicate_expr =
2919            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2920        assert_eq!(predicate_expr.to_string(), expected_expr);
2921
2922        Ok(())
2923    }
2924
2925    #[test]
2926    fn row_group_predicate_between() -> Result<()> {
2927        let schema = Schema::new(vec![
2928            Field::new("c1", DataType::Int32, false),
2929            Field::new("c2", DataType::Int32, false),
2930        ]);
2931
2932        // test c1 BETWEEN 1 AND 5
2933        let expr1 = col("c1").between(lit(1), lit(5));
2934
2935        // test 1 <= c1 <= 5
2936        let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
2937
2938        let predicate_expr1 =
2939            test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
2940
2941        let predicate_expr2 =
2942            test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
2943        assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
2944
2945        Ok(())
2946    }
2947
2948    #[test]
2949    fn row_group_predicate_between_with_in_list() -> Result<()> {
2950        let schema = Schema::new(vec![
2951            Field::new("c1", DataType::Int32, false),
2952            Field::new("c2", DataType::Int32, false),
2953        ]);
2954        // test c1 in(1, 2)
2955        let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
2956
2957        // test c2 BETWEEN 4 AND 5
2958        let expr2 = col("c2").between(lit(4), lit(5));
2959
2960        // test c1 in(1, 2) and c2 BETWEEN 4 AND 5
2961        let expr3 = expr1.and(expr2);
2962
2963        let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
2964        let predicate_expr =
2965            test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
2966        assert_eq!(predicate_expr.to_string(), expected_expr);
2967
2968        Ok(())
2969    }
2970
2971    #[test]
2972    fn row_group_predicate_in_list_to_many_values() -> Result<()> {
2973        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2974        // test c1 in(1..21)
2975        // in pruning.rs has MAX_LIST_VALUE_SIZE_REWRITE = 20, more than this value will be rewrite
2976        // always true
2977        let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
2978
2979        let expected_expr = "true";
2980        let predicate_expr =
2981            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2982        assert_eq!(predicate_expr.to_string(), expected_expr);
2983
2984        Ok(())
2985    }
2986
2987    #[test]
2988    fn row_group_predicate_cast_int_int() -> Result<()> {
2989        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2990        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
2991
2992        // test cast(c1 as int64) = 1
2993        // test column on the left
2994        let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
2995        let predicate_expr =
2996            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2997        assert_eq!(predicate_expr.to_string(), expected_expr);
2998
2999        // test column on the right
3000        let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
3001        let predicate_expr =
3002            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3003        assert_eq!(predicate_expr.to_string(), expected_expr);
3004
3005        let expected_expr =
3006            "c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
3007
3008        // test column on the left
3009        let expr =
3010            try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
3011        let predicate_expr =
3012            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3013        assert_eq!(predicate_expr.to_string(), expected_expr);
3014
3015        // test column on the right
3016        let expr =
3017            lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
3018        let predicate_expr =
3019            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3020        assert_eq!(predicate_expr.to_string(), expected_expr);
3021
3022        Ok(())
3023    }
3024
3025    #[test]
3026    fn row_group_predicate_cast_string_string() -> Result<()> {
3027        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3028        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Utf8) <= 1 AND 1 <= CAST(c1_max@1 AS Utf8)";
3029
3030        // test column on the left
3031        let expr = cast(col("c1"), DataType::Utf8)
3032            .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3033        let predicate_expr =
3034            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3035        assert_eq!(predicate_expr.to_string(), expected_expr);
3036
3037        // test column on the right
3038        let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3039            .eq(cast(col("c1"), DataType::Utf8));
3040        let predicate_expr =
3041            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3042        assert_eq!(predicate_expr.to_string(), expected_expr);
3043
3044        Ok(())
3045    }
3046
3047    #[test]
3048    fn row_group_predicate_cast_string_int() -> Result<()> {
3049        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3050        let expected_expr = "true";
3051
3052        // test column on the left
3053        let expr = cast(col("c1"), DataType::Int32).eq(lit(ScalarValue::Int32(Some(1))));
3054        let predicate_expr =
3055            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3056        assert_eq!(predicate_expr.to_string(), expected_expr);
3057
3058        // test column on the right
3059        let expr = lit(ScalarValue::Int32(Some(1))).eq(cast(col("c1"), DataType::Int32));
3060        let predicate_expr =
3061            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3062        assert_eq!(predicate_expr.to_string(), expected_expr);
3063
3064        Ok(())
3065    }
3066
3067    #[test]
3068    fn row_group_predicate_cast_int_string() -> Result<()> {
3069        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3070        let expected_expr = "true";
3071
3072        // test column on the left
3073        let expr = cast(col("c1"), DataType::Utf8)
3074            .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3075        let predicate_expr =
3076            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3077        assert_eq!(predicate_expr.to_string(), expected_expr);
3078
3079        // test column on the right
3080        let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3081            .eq(cast(col("c1"), DataType::Utf8));
3082        let predicate_expr =
3083            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3084        assert_eq!(predicate_expr.to_string(), expected_expr);
3085
3086        Ok(())
3087    }
3088
3089    #[test]
3090    fn row_group_predicate_date_date() -> Result<()> {
3091        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3092        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Date64) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Date64)";
3093
3094        // test column on the left
3095        let expr =
3096            cast(col("c1"), DataType::Date64).eq(lit(ScalarValue::Date64(Some(123))));
3097        let predicate_expr =
3098            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3099        assert_eq!(predicate_expr.to_string(), expected_expr);
3100
3101        // test column on the right
3102        let expr =
3103            lit(ScalarValue::Date64(Some(123))).eq(cast(col("c1"), DataType::Date64));
3104        let predicate_expr =
3105            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3106        assert_eq!(predicate_expr.to_string(), expected_expr);
3107
3108        Ok(())
3109    }
3110
3111    #[test]
3112    fn row_group_predicate_dict_string_date() -> Result<()> {
3113        // Test with Dictionary<UInt8, Utf8> for the literal
3114        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3115        let expected_expr = "true";
3116
3117        // test column on the left
3118        let expr = cast(
3119            col("c1"),
3120            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3121        )
3122        .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3123        let predicate_expr =
3124            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3125        assert_eq!(predicate_expr.to_string(), expected_expr);
3126
3127        // test column on the right
3128        let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))).eq(cast(
3129            col("c1"),
3130            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3131        ));
3132        let predicate_expr =
3133            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3134        assert_eq!(predicate_expr.to_string(), expected_expr);
3135
3136        Ok(())
3137    }
3138
3139    #[test]
3140    fn row_group_predicate_date_dict_string() -> Result<()> {
3141        // Test with Dictionary<UInt8, Utf8> for the column
3142        let schema = Schema::new(vec![Field::new(
3143            "c1",
3144            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3145            false,
3146        )]);
3147        let expected_expr = "true";
3148
3149        // test column on the left
3150        let expr =
3151            cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3152        let predicate_expr =
3153            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3154        assert_eq!(predicate_expr.to_string(), expected_expr);
3155
3156        // test column on the right
3157        let expr =
3158            lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3159        let predicate_expr =
3160            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3161        assert_eq!(predicate_expr.to_string(), expected_expr);
3162
3163        Ok(())
3164    }
3165
3166    #[test]
3167    fn row_group_predicate_dict_dict_same_value_type() -> Result<()> {
3168        // Test with Dictionary types that have the same value type but different key types
3169        let schema = Schema::new(vec![Field::new(
3170            "c1",
3171            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3172            false,
3173        )]);
3174
3175        // Direct comparison with no cast
3176        let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3177        let predicate_expr =
3178            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3179        let expected_expr =
3180            "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3181        assert_eq!(predicate_expr.to_string(), expected_expr);
3182
3183        // Test with column cast to a dictionary with different key type
3184        let expr = cast(
3185            col("c1"),
3186            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3187        )
3188        .eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3189        let predicate_expr =
3190            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3191        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Utf8)) <= test AND test <= CAST(c1_max@1 AS Dictionary(UInt16, Utf8))";
3192        assert_eq!(predicate_expr.to_string(), expected_expr);
3193
3194        Ok(())
3195    }
3196
3197    #[test]
3198    fn row_group_predicate_dict_dict_different_value_type() -> Result<()> {
3199        // Test with Dictionary types that have different value types
3200        let schema = Schema::new(vec![Field::new(
3201            "c1",
3202            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Int32)),
3203            false,
3204        )]);
3205        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 123 AND 123 <= CAST(c1_max@1 AS Int64)";
3206
3207        // Test with literal of a different type
3208        let expr =
3209            cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(123))));
3210        let predicate_expr =
3211            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3212        assert_eq!(predicate_expr.to_string(), expected_expr);
3213
3214        Ok(())
3215    }
3216
3217    #[test]
3218    fn row_group_predicate_nested_dict() -> Result<()> {
3219        // Test with nested Dictionary types
3220        let schema = Schema::new(vec![Field::new(
3221            "c1",
3222            DataType::Dictionary(
3223                Box::new(DataType::UInt8),
3224                Box::new(DataType::Dictionary(
3225                    Box::new(DataType::UInt16),
3226                    Box::new(DataType::Utf8),
3227                )),
3228            ),
3229            false,
3230        )]);
3231        let expected_expr =
3232            "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3233
3234        // Test with a simple literal
3235        let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3236        let predicate_expr =
3237            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3238        assert_eq!(predicate_expr.to_string(), expected_expr);
3239
3240        Ok(())
3241    }
3242
3243    #[test]
3244    fn row_group_predicate_dict_date_dict_date() -> Result<()> {
3245        // Test with dictionary-wrapped date types for both sides
3246        let schema = Schema::new(vec![Field::new(
3247            "c1",
3248            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Date32)),
3249            false,
3250        )]);
3251        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Date64)) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Dictionary(UInt16, Date64))";
3252
3253        // Test with a cast to a different date type
3254        let expr = cast(
3255            col("c1"),
3256            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Date64)),
3257        )
3258        .eq(lit(ScalarValue::Date64(Some(123))));
3259        let predicate_expr =
3260            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3261        assert_eq!(predicate_expr.to_string(), expected_expr);
3262
3263        Ok(())
3264    }
3265
3266    #[test]
3267    fn row_group_predicate_date_string() -> Result<()> {
3268        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]);
3269        let expected_expr = "true";
3270
3271        // test column on the left
3272        let expr =
3273            cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3274        let predicate_expr =
3275            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3276        assert_eq!(predicate_expr.to_string(), expected_expr);
3277
3278        // test column on the right
3279        let expr =
3280            lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3281        let predicate_expr =
3282            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3283        assert_eq!(predicate_expr.to_string(), expected_expr);
3284
3285        Ok(())
3286    }
3287
3288    #[test]
3289    fn row_group_predicate_string_date() -> Result<()> {
3290        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3291        let expected_expr = "true";
3292
3293        // test column on the left
3294        let expr = cast(col("c1"), DataType::Utf8)
3295            .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3296        let predicate_expr =
3297            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3298        assert_eq!(predicate_expr.to_string(), expected_expr);
3299
3300        // test column on the right
3301        let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string())))
3302            .eq(cast(col("c1"), DataType::Utf8));
3303        let predicate_expr =
3304            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3305        assert_eq!(predicate_expr.to_string(), expected_expr);
3306
3307        Ok(())
3308    }
3309
3310    #[test]
3311    fn row_group_predicate_cast_list() -> Result<()> {
3312        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3313        // test cast(c1 as int64) in int64(1, 2, 3)
3314        let expr = Expr::InList(InList::new(
3315            Box::new(cast(col("c1"), DataType::Int64)),
3316            vec![
3317                lit(ScalarValue::Int64(Some(1))),
3318                lit(ScalarValue::Int64(Some(2))),
3319                lit(ScalarValue::Int64(Some(3))),
3320            ],
3321            false,
3322        ));
3323        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
3324        let predicate_expr =
3325            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3326        assert_eq!(predicate_expr.to_string(), expected_expr);
3327
3328        let expr = Expr::InList(InList::new(
3329            Box::new(cast(col("c1"), DataType::Int64)),
3330            vec![
3331                lit(ScalarValue::Int64(Some(1))),
3332                lit(ScalarValue::Int64(Some(2))),
3333                lit(ScalarValue::Int64(Some(3))),
3334            ],
3335            true,
3336        ));
3337        let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
3338        let predicate_expr =
3339            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3340        assert_eq!(predicate_expr.to_string(), expected_expr);
3341
3342        Ok(())
3343    }
3344
3345    #[test]
3346    fn prune_decimal_data() {
3347        // decimal(9,2)
3348        let schema = Arc::new(Schema::new(vec![Field::new(
3349            "s1",
3350            DataType::Decimal128(9, 2),
3351            true,
3352        )]));
3353
3354        prune_with_expr(
3355            // s1 > 5
3356            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
3357            &schema,
3358            // If the data is written by spark, the physical data type is INT32 in the parquet
3359            // So we use the INT32 type of statistic.
3360            &TestStatistics::new().with(
3361                "s1",
3362                ContainerStats::new_i32(
3363                    vec![Some(0), Some(4), None, Some(3)], // min
3364                    vec![Some(5), Some(6), Some(4), None], // max
3365                ),
3366            ),
3367            &[false, true, false, true],
3368        );
3369
3370        prune_with_expr(
3371            // with cast column to other type
3372            cast(col("s1"), DataType::Decimal128(14, 3))
3373                .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3374            &schema,
3375            &TestStatistics::new().with(
3376                "s1",
3377                ContainerStats::new_i32(
3378                    vec![Some(0), Some(4), None, Some(3)], // min
3379                    vec![Some(5), Some(6), Some(4), None], // max
3380                ),
3381            ),
3382            &[false, true, false, true],
3383        );
3384
3385        prune_with_expr(
3386            // with try cast column to other type
3387            try_cast(col("s1"), DataType::Decimal128(14, 3))
3388                .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3389            &schema,
3390            &TestStatistics::new().with(
3391                "s1",
3392                ContainerStats::new_i32(
3393                    vec![Some(0), Some(4), None, Some(3)], // min
3394                    vec![Some(5), Some(6), Some(4), None], // max
3395                ),
3396            ),
3397            &[false, true, false, true],
3398        );
3399
3400        // decimal(18,2)
3401        let schema = Arc::new(Schema::new(vec![Field::new(
3402            "s1",
3403            DataType::Decimal128(18, 2),
3404            true,
3405        )]));
3406        prune_with_expr(
3407            // s1 > 5
3408            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
3409            &schema,
3410            // If the data is written by spark, the physical data type is INT64 in the parquet
3411            // So we use the INT32 type of statistic.
3412            &TestStatistics::new().with(
3413                "s1",
3414                ContainerStats::new_i64(
3415                    vec![Some(0), Some(4), None, Some(3)], // min
3416                    vec![Some(5), Some(6), Some(4), None], // max
3417                ),
3418            ),
3419            &[false, true, false, true],
3420        );
3421
3422        // decimal(23,2)
3423        let schema = Arc::new(Schema::new(vec![Field::new(
3424            "s1",
3425            DataType::Decimal128(23, 2),
3426            true,
3427        )]));
3428
3429        prune_with_expr(
3430            // s1 > 5
3431            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
3432            &schema,
3433            &TestStatistics::new().with(
3434                "s1",
3435                ContainerStats::new_decimal128(
3436                    vec![Some(0), Some(400), None, Some(300)], // min
3437                    vec![Some(500), Some(600), Some(400), None], // max
3438                    23,
3439                    2,
3440                ),
3441            ),
3442            &[false, true, false, true],
3443        );
3444    }
3445
3446    #[test]
3447    fn prune_api() {
3448        let schema = Arc::new(Schema::new(vec![
3449            Field::new("s1", DataType::Utf8, true),
3450            Field::new("s2", DataType::Int32, true),
3451        ]));
3452
3453        let statistics = TestStatistics::new().with(
3454            "s2",
3455            ContainerStats::new_i32(
3456                vec![Some(0), Some(4), None, Some(3)], // min
3457                vec![Some(5), Some(6), None, None],    // max
3458            ),
3459        );
3460        prune_with_expr(
3461            // Prune using s2 > 5
3462            col("s2").gt(lit(5)),
3463            &schema,
3464            &statistics,
3465            // s2 [0, 5] ==> no rows should pass
3466            // s2 [4, 6] ==> some rows could pass
3467            // No stats for s2 ==> some rows could pass
3468            // s2 [3, None] (null max) ==> some rows could pass
3469            &[false, true, true, true],
3470        );
3471
3472        prune_with_expr(
3473            // filter with cast
3474            cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
3475            &schema,
3476            &statistics,
3477            &[false, true, true, true],
3478        );
3479    }
3480
3481    #[test]
3482    fn prune_not_eq_data() {
3483        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3484
3485        prune_with_expr(
3486            // Prune using s2 != 'M'
3487            col("s1").not_eq(lit("M")),
3488            &schema,
3489            &TestStatistics::new().with(
3490                "s1",
3491                ContainerStats::new_utf8(
3492                    vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], // min
3493                    vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], // max
3494                ),
3495            ),
3496            // s1 [A, Z] ==> might have values that pass predicate
3497            // s1 [A, L] ==> all rows pass the predicate
3498            // s1 [N, Z] ==> all rows pass the predicate
3499            // s1 [M, M] ==> all rows do not pass the predicate
3500            // No stats for s2 ==> some rows could pass
3501            // s2 [3, None] (null max) ==> some rows could pass
3502            &[true, true, true, false, true, true],
3503        );
3504    }
3505
3506    /// Creates setup for boolean chunk pruning
3507    ///
3508    /// For predicate "b1" (boolean expr)
3509    /// b1 [false, false] ==> no rows can pass (not keep)
3510    /// b1 [false, true] ==> some rows could pass (must keep)
3511    /// b1 [true, true] ==> all rows must pass (must keep)
3512    /// b1 [NULL, NULL]  ==> unknown (must keep)
3513    /// b1 [false, NULL]  ==> unknown (must keep)
3514    ///
3515    /// For predicate "!b1" (boolean expr)
3516    /// b1 [false, false] ==> all rows pass (must keep)
3517    /// b1 [false, true] ==> some rows could pass (must keep)
3518    /// b1 [true, true] ==> no rows can pass (not keep)
3519    /// b1 [NULL, NULL]  ==> unknown (must keep)
3520    /// b1 [false, NULL]  ==> unknown (must keep)
3521    fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
3522        let schema =
3523            Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
3524
3525        let statistics = TestStatistics::new().with(
3526            "b1",
3527            ContainerStats::new_bool(
3528                vec![Some(false), Some(false), Some(true), None, Some(false)], // min
3529                vec![Some(false), Some(true), Some(true), None, None],         // max
3530            ),
3531        );
3532        let expected_true = vec![false, true, true, true, true];
3533        let expected_false = vec![true, true, false, true, true];
3534
3535        (schema, statistics, expected_true, expected_false)
3536    }
3537
3538    #[test]
3539    fn prune_bool_const_expr() {
3540        let (schema, statistics, _, _) = bool_setup();
3541
3542        prune_with_expr(
3543            // true
3544            lit(true),
3545            &schema,
3546            &statistics,
3547            &[true, true, true, true, true],
3548        );
3549
3550        prune_with_expr(
3551            // false
3552            lit(false),
3553            &schema,
3554            &statistics,
3555            &[false, false, false, false, false],
3556        );
3557    }
3558
3559    #[test]
3560    fn prune_bool_column() {
3561        let (schema, statistics, expected_true, _) = bool_setup();
3562
3563        prune_with_expr(
3564            // b1
3565            col("b1"),
3566            &schema,
3567            &statistics,
3568            &expected_true,
3569        );
3570    }
3571
3572    #[test]
3573    fn prune_bool_not_column() {
3574        let (schema, statistics, _, expected_false) = bool_setup();
3575
3576        prune_with_expr(
3577            // !b1
3578            col("b1").not(),
3579            &schema,
3580            &statistics,
3581            &expected_false,
3582        );
3583    }
3584
3585    #[test]
3586    fn prune_bool_column_eq_true() {
3587        let (schema, statistics, expected_true, _) = bool_setup();
3588
3589        prune_with_expr(
3590            // b1 = true
3591            col("b1").eq(lit(true)),
3592            &schema,
3593            &statistics,
3594            &expected_true,
3595        );
3596    }
3597
3598    #[test]
3599    fn prune_bool_not_column_eq_true() {
3600        let (schema, statistics, _, expected_false) = bool_setup();
3601
3602        prune_with_expr(
3603            // !b1 = true
3604            col("b1").not().eq(lit(true)),
3605            &schema,
3606            &statistics,
3607            &expected_false,
3608        );
3609    }
3610
3611    /// Creates a setup for chunk pruning, modeling a int32 column "i"
3612    /// with 5 different containers (e.g. RowGroups). They have [min,
3613    /// max]:
3614    ///
3615    /// i [-5, 5]
3616    /// i [1, 11]
3617    /// i [-11, -1]
3618    /// i [NULL, NULL]
3619    /// i [1, NULL]
3620    fn int32_setup() -> (SchemaRef, TestStatistics) {
3621        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
3622
3623        let statistics = TestStatistics::new().with(
3624            "i",
3625            ContainerStats::new_i32(
3626                vec![Some(-5), Some(1), Some(-11), None, Some(1)], // min
3627                vec![Some(5), Some(11), Some(-1), None, None],     // max
3628            ),
3629        );
3630        (schema, statistics)
3631    }
3632
3633    #[test]
3634    fn prune_int32_col_gt_zero() {
3635        let (schema, statistics) = int32_setup();
3636
3637        // Expression "i > 0" and "-i < 0"
3638        // i [-5, 5] ==> some rows could pass (must keep)
3639        // i [1, 11] ==> all rows must pass (must keep)
3640        // i [-11, -1] ==>  no rows can pass (not keep)
3641        // i [NULL, NULL]  ==> unknown (must keep)
3642        // i [1, NULL]  ==> unknown (must keep)
3643        let expected_ret = &[true, true, false, true, true];
3644
3645        // i > 0
3646        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
3647
3648        // -i < 0
3649        prune_with_expr(
3650            Expr::Negative(Box::new(col("i"))).lt(lit(0)),
3651            &schema,
3652            &statistics,
3653            expected_ret,
3654        );
3655    }
3656
3657    #[test]
3658    fn prune_int32_col_lte_zero() {
3659        let (schema, statistics) = int32_setup();
3660
3661        // Expression "i <= 0" and "-i >= 0"
3662        // i [-5, 5] ==> some rows could pass (must keep)
3663        // i [1, 11] ==> no rows can pass (not keep)
3664        // i [-11, -1] ==>  all rows must pass (must keep)
3665        // i [NULL, NULL]  ==> unknown (must keep)
3666        // i [1, NULL]  ==> no rows can pass (not keep)
3667        let expected_ret = &[true, false, true, true, false];
3668
3669        prune_with_expr(
3670            // i <= 0
3671            col("i").lt_eq(lit(0)),
3672            &schema,
3673            &statistics,
3674            expected_ret,
3675        );
3676
3677        prune_with_expr(
3678            // -i >= 0
3679            Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
3680            &schema,
3681            &statistics,
3682            expected_ret,
3683        );
3684    }
3685
3686    #[test]
3687    fn prune_int32_col_lte_zero_cast() {
3688        let (schema, statistics) = int32_setup();
3689
3690        // Expression "cast(i as utf8) <= '0'"
3691        // i [-5, 5] ==> some rows could pass (must keep)
3692        // i [1, 11] ==> no rows can pass in theory, -0.22 (conservatively keep)
3693        // i [-11, -1] ==>  no rows could pass in theory (conservatively keep)
3694        // i [NULL, NULL]  ==> unknown (must keep)
3695        // i [1, NULL]  ==> no rows can pass (conservatively keep)
3696        let expected_ret = &[true, true, true, true, true];
3697
3698        prune_with_expr(
3699            // cast(i as utf8) <= 0
3700            cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3701            &schema,
3702            &statistics,
3703            expected_ret,
3704        );
3705
3706        prune_with_expr(
3707            // try_cast(i as utf8) <= 0
3708            try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3709            &schema,
3710            &statistics,
3711            expected_ret,
3712        );
3713
3714        prune_with_expr(
3715            // cast(-i as utf8) >= 0
3716            cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3717            &schema,
3718            &statistics,
3719            expected_ret,
3720        );
3721
3722        prune_with_expr(
3723            // try_cast(-i as utf8) >= 0
3724            try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3725            &schema,
3726            &statistics,
3727            expected_ret,
3728        );
3729    }
3730
3731    #[test]
3732    fn prune_int32_col_eq_zero() {
3733        let (schema, statistics) = int32_setup();
3734
3735        // Expression "i = 0"
3736        // i [-5, 5] ==> some rows could pass (must keep)
3737        // i [1, 11] ==> no rows can pass (not keep)
3738        // i [-11, -1] ==>  no rows can pass (not keep)
3739        // i [NULL, NULL]  ==> unknown (must keep)
3740        // i [1, NULL]  ==> no rows can pass (not keep)
3741        let expected_ret = &[true, false, false, true, false];
3742
3743        prune_with_expr(
3744            // i = 0
3745            col("i").eq(lit(0)),
3746            &schema,
3747            &statistics,
3748            expected_ret,
3749        );
3750    }
3751
3752    #[test]
3753    fn prune_int32_col_eq_zero_cast() {
3754        let (schema, statistics) = int32_setup();
3755
3756        // Expression "cast(i as int64) = 0"
3757        // i [-5, 5] ==> some rows could pass (must keep)
3758        // i [1, 11] ==> no rows can pass (not keep)
3759        // i [-11, -1] ==>  no rows can pass (not keep)
3760        // i [NULL, NULL]  ==> unknown (must keep)
3761        // i [1, NULL]  ==> no rows can pass (not keep)
3762        let expected_ret = &[true, false, false, true, false];
3763
3764        prune_with_expr(
3765            cast(col("i"), DataType::Int64).eq(lit(0i64)),
3766            &schema,
3767            &statistics,
3768            expected_ret,
3769        );
3770
3771        prune_with_expr(
3772            try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
3773            &schema,
3774            &statistics,
3775            expected_ret,
3776        );
3777    }
3778
3779    #[test]
3780    fn prune_int32_col_eq_zero_cast_as_str() {
3781        let (schema, statistics) = int32_setup();
3782
3783        // Note the cast is to a string where sorting properties are
3784        // not the same as integers
3785        //
3786        // Expression "cast(i as utf8) = '0'"
3787        // i [-5, 5] ==> some rows could pass (keep)
3788        // i [1, 11] ==> no rows can pass  (could keep)
3789        // i [-11, -1] ==>  no rows can pass (could keep)
3790        // i [NULL, NULL]  ==> unknown (keep)
3791        // i [1, NULL]  ==> no rows can pass (could keep)
3792        let expected_ret = &[true, true, true, true, true];
3793
3794        prune_with_expr(
3795            cast(col("i"), DataType::Utf8).eq(lit("0")),
3796            &schema,
3797            &statistics,
3798            expected_ret,
3799        );
3800    }
3801
3802    #[test]
3803    fn prune_int32_col_lt_neg_one() {
3804        let (schema, statistics) = int32_setup();
3805
3806        // Expression "i > -1" and "-i < 1"
3807        // i [-5, 5] ==> some rows could pass (must keep)
3808        // i [1, 11] ==> all rows must pass (must keep)
3809        // i [-11, -1] ==>  no rows can pass (not keep)
3810        // i [NULL, NULL]  ==> unknown (must keep)
3811        // i [1, NULL]  ==> all rows must pass (must keep)
3812        let expected_ret = &[true, true, false, true, true];
3813
3814        prune_with_expr(
3815            // i > -1
3816            col("i").gt(lit(-1)),
3817            &schema,
3818            &statistics,
3819            expected_ret,
3820        );
3821
3822        prune_with_expr(
3823            // -i < 1
3824            Expr::Negative(Box::new(col("i"))).lt(lit(1)),
3825            &schema,
3826            &statistics,
3827            expected_ret,
3828        );
3829    }
3830
3831    #[test]
3832    fn prune_int32_is_null() {
3833        let (schema, statistics) = int32_setup();
3834
3835        // Expression "i IS NULL" when there are no null statistics,
3836        // should all be kept
3837        let expected_ret = &[true, true, true, true, true];
3838
3839        prune_with_expr(
3840            // i IS NULL, no null statistics
3841            col("i").is_null(),
3842            &schema,
3843            &statistics,
3844            expected_ret,
3845        );
3846
3847        // provide null counts for each column
3848        let statistics = statistics.with_null_counts(
3849            "i",
3850            vec![
3851                Some(0), // no nulls (don't keep)
3852                Some(1), // 1 null
3853                None,    // unknown nulls
3854                None, // unknown nulls (min/max are both null too, like no stats at all)
3855                Some(0), // 0 nulls (max=null too which means no known max) (don't keep)
3856            ],
3857        );
3858
3859        let expected_ret = &[false, true, true, true, false];
3860
3861        prune_with_expr(
3862            // i IS NULL, with actual null statistics
3863            col("i").is_null(),
3864            &schema,
3865            &statistics,
3866            expected_ret,
3867        );
3868    }
3869
3870    #[test]
3871    fn prune_int32_column_is_known_all_null() {
3872        let (schema, statistics) = int32_setup();
3873
3874        // Expression "i < 0"
3875        // i [-5, 5] ==> some rows could pass (must keep)
3876        // i [1, 11] ==> no rows can pass (not keep)
3877        // i [-11, -1] ==>  all rows must pass (must keep)
3878        // i [NULL, NULL]  ==> unknown (must keep)
3879        // i [1, NULL]  ==> no rows can pass (not keep)
3880        let expected_ret = &[true, false, true, true, false];
3881
3882        prune_with_expr(
3883            // i < 0
3884            col("i").lt(lit(0)),
3885            &schema,
3886            &statistics,
3887            expected_ret,
3888        );
3889
3890        // provide row counts for each column
3891        let statistics = statistics.with_row_counts(
3892            "i",
3893            vec![
3894                Some(10), // 10 rows of data
3895                Some(9),  // 9 rows of data
3896                None,     // unknown row counts
3897                Some(4),
3898                Some(10),
3899            ],
3900        );
3901
3902        // pruning result is still the same if we only know row counts
3903        prune_with_expr(
3904            // i < 0, with only row counts statistics
3905            col("i").lt(lit(0)),
3906            &schema,
3907            &statistics,
3908            expected_ret,
3909        );
3910
3911        // provide null counts for each column
3912        let statistics = statistics.with_null_counts(
3913            "i",
3914            vec![
3915                Some(0), // no nulls
3916                Some(1), // 1 null
3917                None,    // unknown nulls
3918                Some(4), // 4 nulls, which is the same as the row counts, i.e. this column is all null (don't keep)
3919                Some(0), // 0 nulls (max=null too which means no known max)
3920            ],
3921        );
3922
3923        // Expression "i < 0" with actual null and row counts statistics
3924        // col | min, max     | row counts | null counts |
3925        // ----+--------------+------------+-------------+
3926        //  i  | [-5, 5]      | 10         | 0           | ==> Some rows could pass (must keep)
3927        //  i  | [1, 11]      | 9          | 1           | ==> No rows can pass (not keep)
3928        //  i  | [-11,-1]     | Unknown    | Unknown     | ==> All rows must pass (must keep)
3929        //  i  | [NULL, NULL] | 4          | 4           | ==> The column is all null (not keep)
3930        //  i  | [1, NULL]    | 10         | 0           | ==> No rows can pass (not keep)
3931        let expected_ret = &[true, false, true, false, false];
3932
3933        prune_with_expr(
3934            // i < 0, with actual null and row counts statistics
3935            col("i").lt(lit(0)),
3936            &schema,
3937            &statistics,
3938            expected_ret,
3939        );
3940    }
3941
3942    #[test]
3943    fn prune_cast_column_scalar() {
3944        // The data type of column i is INT32
3945        let (schema, statistics) = int32_setup();
3946        let expected_ret = &[true, true, false, true, true];
3947
3948        prune_with_expr(
3949            // i > int64(0)
3950            col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
3951            &schema,
3952            &statistics,
3953            expected_ret,
3954        );
3955
3956        prune_with_expr(
3957            // cast(i as int64) > int64(0)
3958            cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3959            &schema,
3960            &statistics,
3961            expected_ret,
3962        );
3963
3964        prune_with_expr(
3965            // try_cast(i as int64) > int64(0)
3966            try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3967            &schema,
3968            &statistics,
3969            expected_ret,
3970        );
3971
3972        prune_with_expr(
3973            // `-cast(i as int64) < 0` convert to `cast(i as int64) > -0`
3974            Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
3975                .lt(lit(ScalarValue::Int64(Some(0)))),
3976            &schema,
3977            &statistics,
3978            expected_ret,
3979        );
3980    }
3981
3982    #[test]
3983    fn test_increment_utf8() {
3984        // Basic ASCII
3985        assert_eq!(increment_utf8("abc").unwrap(), "abd");
3986        assert_eq!(increment_utf8("abz").unwrap(), "ab{");
3987
3988        // Test around ASCII 127 (DEL)
3989        assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); // 126 -> 127
3990        assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}"); // 127 -> 128
3991
3992        // Test 2-byte UTF-8 sequences
3993        assert_eq!(increment_utf8("ß").unwrap(), "à"); // U+00DF -> U+00E0
3994
3995        // Test 3-byte UTF-8 sequences
3996        assert_eq!(increment_utf8("℣").unwrap(), "ℤ"); // U+2123 -> U+2124
3997
3998        // Test at UTF-8 boundaries
3999        assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); // 2-byte to 3-byte boundary
4000        assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}"); // 3-byte to 4-byte boundary
4001
4002        // Test that if we can't increment we return None
4003        assert!(increment_utf8("").is_none());
4004        assert!(increment_utf8("\u{10FFFF}").is_none()); // U+10FFFF is the max code point
4005
4006        // Test that if we can't increment the last character we do the previous one and truncate
4007        assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4008
4009        // Test surrogate pair range (0xD800..=0xDFFF)
4010        assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
4011        assert!(increment_utf8("\u{D7FF}").is_none());
4012
4013        // Test non-characters range (0xFDD0..=0xFDEF)
4014        assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
4015        assert!(increment_utf8("\u{FDCF}").is_none());
4016
4017        // Test private use area limit (>= 0x110000)
4018        assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4019        assert!(increment_utf8("\u{10FFFF}").is_none()); // Can't increment past max valid codepoint
4020    }
4021
4022    /// Creates a setup for chunk pruning, modeling a utf8 column "s1"
4023    /// with 5 different containers (e.g. RowGroups). They have [min,
4024    /// max]:
4025    /// s1 ["A", "Z"]
4026    /// s1 ["A", "L"]
4027    /// s1 ["N", "Z"]
4028    /// s1 [NULL, NULL]
4029    /// s1 ["A", NULL]
4030    /// s1 ["", "A"]
4031    /// s1 ["", ""]
4032    /// s1 ["AB", "A\u{10ffff}"]
4033    /// s1 ["A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]
4034    fn utf8_setup() -> (SchemaRef, TestStatistics) {
4035        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4036
4037        let statistics = TestStatistics::new().with(
4038            "s1",
4039            ContainerStats::new_utf8(
4040                vec![
4041                    Some("A"),
4042                    Some("A"),
4043                    Some("N"),
4044                    Some("M"),
4045                    None,
4046                    Some("A"),
4047                    Some(""),
4048                    Some(""),
4049                    Some("AB"),
4050                    Some("A\u{10ffff}\u{10ffff}"),
4051                ], // min
4052                vec![
4053                    Some("Z"),
4054                    Some("L"),
4055                    Some("Z"),
4056                    Some("M"),
4057                    None,
4058                    None,
4059                    Some("A"),
4060                    Some(""),
4061                    Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
4062                    Some("A\u{10ffff}\u{10ffff}"),
4063                ], // max
4064            ),
4065        );
4066        (schema, statistics)
4067    }
4068
4069    #[test]
4070    fn prune_utf8_eq() {
4071        let (schema, statistics) = utf8_setup();
4072
4073        let expr = col("s1").eq(lit("A"));
4074        #[rustfmt::skip]
4075        let expected_ret = &[
4076            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4077            true,
4078            // s1 ["A", "L"] ==> some rows could pass (must keep)
4079            true,
4080            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4081            false,
4082            // s1 ["M", "M"] ==> no rows can pass (not keep)
4083            false,
4084            // s1 [NULL, NULL]  ==> unknown (must keep)
4085            true,
4086            // s1 ["A", NULL]  ==> unknown (must keep)
4087            true,
4088            // s1 ["", "A"]  ==> some rows could pass (must keep)
4089            true,
4090            // s1 ["", ""]  ==> no rows can pass (not keep)
4091            false,
4092            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4093            false,
4094            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4095            false,
4096        ];
4097        prune_with_expr(expr, &schema, &statistics, expected_ret);
4098
4099        let expr = col("s1").eq(lit(""));
4100        #[rustfmt::skip]
4101        let expected_ret = &[
4102            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4103            false,
4104            // s1 ["A", "L"] ==> no rows can pass (not keep)
4105            false,
4106            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4107            false,
4108            // s1 ["M", "M"] ==> no rows can pass (not keep)
4109            false,
4110            // s1 [NULL, NULL]  ==> unknown (must keep)
4111            true,
4112            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4113            false,
4114            // s1 ["", "A"]  ==> some rows could pass (must keep)
4115            true,
4116            // s1 ["", ""]  ==> all rows must pass (must keep)
4117            true,
4118            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4119            false,
4120            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4121            false,
4122        ];
4123        prune_with_expr(expr, &schema, &statistics, expected_ret);
4124    }
4125
4126    #[test]
4127    fn prune_utf8_not_eq() {
4128        let (schema, statistics) = utf8_setup();
4129
4130        let expr = col("s1").not_eq(lit("A"));
4131        #[rustfmt::skip]
4132        let expected_ret = &[
4133            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4134            true,
4135            // s1 ["A", "L"] ==> some rows could pass (must keep)
4136            true,
4137            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4138            true,
4139            // s1 ["M", "M"] ==> all rows must pass (must keep)
4140            true,
4141            // s1 [NULL, NULL]  ==> unknown (must keep)
4142            true,
4143            // s1 ["A", NULL]  ==> unknown (must keep)
4144            true,
4145            // s1 ["", "A"]  ==> some rows could pass (must keep)
4146            true,
4147            // s1 ["", ""]  ==> all rows must pass (must keep)
4148            true,
4149            // s1 ["AB", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4150            true,
4151            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4152            true,
4153        ];
4154        prune_with_expr(expr, &schema, &statistics, expected_ret);
4155
4156        let expr = col("s1").not_eq(lit(""));
4157        #[rustfmt::skip]
4158        let expected_ret = &[
4159            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4160            true,
4161            // s1 ["A", "L"] ==> all rows must pass (must keep)
4162            true,
4163            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4164            true,
4165            // s1 ["M", "M"] ==> all rows must pass (must keep)
4166            true,
4167            // s1 [NULL, NULL]  ==> unknown (must keep)
4168            true,
4169            // s1 ["A", NULL]  ==> unknown (must keep)
4170            true,
4171            // s1 ["", "A"]  ==> some rows could pass (must keep)
4172            true,
4173            // s1 ["", ""]  ==> no rows can pass (not keep)
4174            false,
4175            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4176            true,
4177            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4178            true,
4179        ];
4180        prune_with_expr(expr, &schema, &statistics, expected_ret);
4181    }
4182
4183    #[test]
4184    fn prune_utf8_like_one() {
4185        let (schema, statistics) = utf8_setup();
4186
4187        let expr = col("s1").like(lit("A_"));
4188        #[rustfmt::skip]
4189        let expected_ret = &[
4190            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4191            true,
4192            // s1 ["A", "L"] ==> some rows could pass (must keep)
4193            true,
4194            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4195            false,
4196            // s1 ["M", "M"] ==> no rows can pass (not keep)
4197            false,
4198            // s1 [NULL, NULL]  ==> unknown (must keep)
4199            true,
4200            // s1 ["A", NULL]  ==> unknown (must keep)
4201            true,
4202            // s1 ["", "A"]  ==> some rows could pass (must keep)
4203            true,
4204            // s1 ["", ""]  ==> no rows can pass (not keep)
4205            false,
4206            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4207            true,
4208            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4209            true,
4210        ];
4211        prune_with_expr(expr, &schema, &statistics, expected_ret);
4212
4213        let expr = col("s1").like(lit("_A_"));
4214        #[rustfmt::skip]
4215        let expected_ret = &[
4216            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4217            true,
4218            // s1 ["A", "L"] ==> some rows could pass (must keep)
4219            true,
4220            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4221            true,
4222            // s1 ["M", "M"] ==> some rows could pass (must keep)
4223            true,
4224            // s1 [NULL, NULL]  ==> unknown (must keep)
4225            true,
4226            // s1 ["A", NULL]  ==> unknown (must keep)
4227            true,
4228            // s1 ["", "A"]  ==> some rows could pass (must keep)
4229            true,
4230            // s1 ["", ""]  ==> some rows could pass (must keep)
4231            true,
4232            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4233            true,
4234            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4235            true,
4236        ];
4237        prune_with_expr(expr, &schema, &statistics, expected_ret);
4238
4239        let expr = col("s1").like(lit("_"));
4240        #[rustfmt::skip]
4241        let expected_ret = &[
4242            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4243            true,
4244            // s1 ["A", "L"] ==> all rows must pass (must keep)
4245            true,
4246            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4247            true,
4248            // s1 ["M", "M"] ==> all rows must pass (must keep)
4249            true,
4250            // s1 [NULL, NULL]  ==> unknown (must keep)
4251            true,
4252            // s1 ["A", NULL]  ==> unknown (must keep)
4253            true,
4254            // s1 ["", "A"]  ==> all rows must pass (must keep)
4255            true,
4256            // s1 ["", ""]  ==> all rows must pass (must keep)
4257            true,
4258            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4259            true,
4260            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4261            true,
4262        ];
4263        prune_with_expr(expr, &schema, &statistics, expected_ret);
4264
4265        let expr = col("s1").like(lit(""));
4266        #[rustfmt::skip]
4267        let expected_ret = &[
4268            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4269            false,
4270            // s1 ["A", "L"] ==> no rows can pass (not keep)
4271            false,
4272            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4273            false,
4274            // s1 ["M", "M"] ==> no rows can pass (not keep)
4275            false,
4276            // s1 [NULL, NULL]  ==> unknown (must keep)
4277            true,
4278            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4279            false,
4280            // s1 ["", "A"]  ==> some rows could pass (must keep)
4281            true,
4282            // s1 ["", ""]  ==> all rows must pass (must keep)
4283            true,
4284            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4285            false,
4286            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4287            false,
4288        ];
4289        prune_with_expr(expr, &schema, &statistics, expected_ret);
4290    }
4291
4292    #[test]
4293    fn prune_utf8_like_many() {
4294        let (schema, statistics) = utf8_setup();
4295
4296        let expr = col("s1").like(lit("A%"));
4297        #[rustfmt::skip]
4298        let expected_ret = &[
4299            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4300            true,
4301            // s1 ["A", "L"] ==> some rows could pass (must keep)
4302            true,
4303            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4304            false,
4305            // s1 ["M", "M"] ==> no rows can pass (not keep)
4306            false,
4307            // s1 [NULL, NULL]  ==> unknown (must keep)
4308            true,
4309            // s1 ["A", NULL]  ==> unknown (must keep)
4310            true,
4311            // s1 ["", "A"]  ==> some rows could pass (must keep)
4312            true,
4313            // s1 ["", ""]  ==> no rows can pass (not keep)
4314            false,
4315            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4316            true,
4317            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4318            true,
4319        ];
4320        prune_with_expr(expr, &schema, &statistics, expected_ret);
4321
4322        let expr = col("s1").like(lit("%A%"));
4323        #[rustfmt::skip]
4324        let expected_ret = &[
4325            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4326            true,
4327            // s1 ["A", "L"] ==> some rows could pass (must keep)
4328            true,
4329            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4330            true,
4331            // s1 ["M", "M"] ==> some rows could pass (must keep)
4332            true,
4333            // s1 [NULL, NULL]  ==> unknown (must keep)
4334            true,
4335            // s1 ["A", NULL]  ==> unknown (must keep)
4336            true,
4337            // s1 ["", "A"]  ==> some rows could pass (must keep)
4338            true,
4339            // s1 ["", ""]  ==> some rows could pass (must keep)
4340            true,
4341            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4342            true,
4343            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4344            true,
4345        ];
4346        prune_with_expr(expr, &schema, &statistics, expected_ret);
4347
4348        let expr = col("s1").like(lit("%"));
4349        #[rustfmt::skip]
4350        let expected_ret = &[
4351            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4352            true,
4353            // s1 ["A", "L"] ==> all rows must pass (must keep)
4354            true,
4355            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4356            true,
4357            // s1 ["M", "M"] ==> all rows must pass (must keep)
4358            true,
4359            // s1 [NULL, NULL]  ==> unknown (must keep)
4360            true,
4361            // s1 ["A", NULL]  ==> unknown (must keep)
4362            true,
4363            // s1 ["", "A"]  ==> all rows must pass (must keep)
4364            true,
4365            // s1 ["", ""]  ==> all rows must pass (must keep)
4366            true,
4367            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4368            true,
4369            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4370            true,
4371        ];
4372        prune_with_expr(expr, &schema, &statistics, expected_ret);
4373
4374        let expr = col("s1").like(lit(""));
4375        #[rustfmt::skip]
4376        let expected_ret = &[
4377            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4378            false,
4379            // s1 ["A", "L"] ==> no rows can pass (not keep)
4380            false,
4381            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4382            false,
4383            // s1 ["M", "M"] ==> no rows can pass (not keep)
4384            false,
4385            // s1 [NULL, NULL]  ==> unknown (must keep)
4386            true,
4387            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4388            false,
4389            // s1 ["", "A"]  ==> some rows could pass (must keep)
4390            true,
4391            // s1 ["", ""]  ==> all rows must pass (must keep)
4392            true,
4393            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4394            false,
4395            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4396            false,
4397        ];
4398        prune_with_expr(expr, &schema, &statistics, expected_ret);
4399    }
4400
4401    #[test]
4402    fn prune_utf8_not_like_one() {
4403        let (schema, statistics) = utf8_setup();
4404
4405        let expr = col("s1").not_like(lit("A\u{10ffff}_"));
4406        #[rustfmt::skip]
4407        let expected_ret = &[
4408            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4409            true,
4410            // s1 ["A", "L"] ==> some rows could pass (must keep)
4411            true,
4412            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4413            true,
4414            // s1 ["M", "M"] ==> some rows could pass (must keep)
4415            true,
4416            // s1 [NULL, NULL]  ==> unknown (must keep)
4417            true,
4418            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4419            true,
4420            // s1 ["", "A"]  ==> some rows could pass (must keep)
4421            true,
4422            // s1 ["", ""]  ==> some rows could pass (must keep)
4423            true,
4424            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4425            true,
4426            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no row match. (min, max) maybe truncate 
4427            // original (min, max) maybe ("A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}\u{10ffff}\u{10ffff}")
4428            true,
4429        ];
4430        prune_with_expr(expr, &schema, &statistics, expected_ret);
4431    }
4432
4433    #[test]
4434    fn prune_utf8_not_like_many() {
4435        let (schema, statistics) = utf8_setup();
4436
4437        let expr = col("s1").not_like(lit("A\u{10ffff}%"));
4438        #[rustfmt::skip]
4439        let expected_ret = &[
4440            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4441            true,
4442            // s1 ["A", "L"] ==> some rows could pass (must keep)
4443            true,
4444            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4445            true,
4446            // s1 ["M", "M"] ==> some rows could pass (must keep)
4447            true,
4448            // s1 [NULL, NULL]  ==> unknown (must keep)
4449            true,
4450            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4451            true,
4452            // s1 ["", "A"]  ==> some rows could pass (must keep)
4453            true,
4454            // s1 ["", ""]  ==> some rows could pass (must keep)
4455            true,
4456            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4457            true,
4458            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no row match
4459            false,
4460        ];
4461        prune_with_expr(expr, &schema, &statistics, expected_ret);
4462
4463        let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}"));
4464        #[rustfmt::skip]
4465        let expected_ret = &[
4466            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4467            true,
4468            // s1 ["A", "L"] ==> some rows could pass (must keep)
4469            true,
4470            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4471            true,
4472            // s1 ["M", "M"] ==> some rows could pass (must keep)
4473            true,
4474            // s1 [NULL, NULL]  ==> unknown (must keep)
4475            true,
4476            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4477            true,
4478            // s1 ["", "A"]  ==> some rows could pass (must keep)
4479            true,
4480            // s1 ["", ""]  ==> some rows could pass (must keep)
4481            true,
4482            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4483            true,
4484            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4485            true,
4486        ];
4487        prune_with_expr(expr, &schema, &statistics, expected_ret);
4488
4489        let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}_"));
4490        #[rustfmt::skip]
4491        let expected_ret = &[
4492            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4493            true,
4494            // s1 ["A", "L"] ==> some rows could pass (must keep)
4495            true,
4496            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4497            true,
4498            // s1 ["M", "M"] ==> some rows could pass (must keep)
4499            true,
4500            // s1 [NULL, NULL]  ==> unknown (must keep)
4501            true,
4502            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4503            true,
4504            // s1 ["", "A"]  ==> some rows could pass (must keep)
4505            true,
4506            // s1 ["", ""]  ==> some rows could pass (must keep)
4507            true,
4508            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4509            true,
4510            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4511            true,
4512        ];
4513        prune_with_expr(expr, &schema, &statistics, expected_ret);
4514
4515        let expr = col("s1").not_like(lit("A\\%%"));
4516        let statistics = TestStatistics::new().with(
4517            "s1",
4518            ContainerStats::new_utf8(
4519                vec![Some("A%a"), Some("A")],
4520                vec![Some("A%c"), Some("A")],
4521            ),
4522        );
4523        let expected_ret = &[false, true];
4524        prune_with_expr(expr, &schema, &statistics, expected_ret);
4525    }
4526
4527    #[test]
4528    fn test_rewrite_expr_to_prunable() {
4529        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4530        let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4531
4532        // column op lit
4533        let left_input = col("a");
4534        let left_input = logical2physical(&left_input, &schema);
4535        let right_input = lit(ScalarValue::Int32(Some(12)));
4536        let right_input = logical2physical(&right_input, &schema);
4537        let (result_left, _, result_right) = rewrite_expr_to_prunable(
4538            &left_input,
4539            Operator::Eq,
4540            &right_input,
4541            df_schema.clone(),
4542        )
4543        .unwrap();
4544        assert_eq!(result_left.to_string(), left_input.to_string());
4545        assert_eq!(result_right.to_string(), right_input.to_string());
4546
4547        // cast op lit
4548        let left_input = cast(col("a"), DataType::Decimal128(20, 3));
4549        let left_input = logical2physical(&left_input, &schema);
4550        let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
4551        let right_input = logical2physical(&right_input, &schema);
4552        let (result_left, _, result_right) = rewrite_expr_to_prunable(
4553            &left_input,
4554            Operator::Gt,
4555            &right_input,
4556            df_schema.clone(),
4557        )
4558        .unwrap();
4559        assert_eq!(result_left.to_string(), left_input.to_string());
4560        assert_eq!(result_right.to_string(), right_input.to_string());
4561
4562        // try_cast op lit
4563        let left_input = try_cast(col("a"), DataType::Int64);
4564        let left_input = logical2physical(&left_input, &schema);
4565        let right_input = lit(ScalarValue::Int64(Some(12)));
4566        let right_input = logical2physical(&right_input, &schema);
4567        let (result_left, _, result_right) =
4568            rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
4569                .unwrap();
4570        assert_eq!(result_left.to_string(), left_input.to_string());
4571        assert_eq!(result_right.to_string(), right_input.to_string());
4572
4573        // TODO: add test for other case and op
4574    }
4575
4576    #[test]
4577    fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
4578        struct CustomUnhandledHook;
4579
4580        impl UnhandledPredicateHook for CustomUnhandledHook {
4581            /// This handles an arbitrary case of a column that doesn't exist in the schema
4582            /// by renaming it to yet another column that doesn't exist in the schema
4583            /// (the transformation is arbitrary, the point is that it can do whatever it wants)
4584            fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
4585                Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
4586            }
4587        }
4588
4589        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4590        let schema_with_b = Schema::new(vec![
4591            Field::new("a", DataType::Int32, true),
4592            Field::new("b", DataType::Int32, true),
4593        ]);
4594
4595        let rewriter = PredicateRewriter::new()
4596            .with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
4597
4598        let transform_expr = |expr| {
4599            let expr = logical2physical(&expr, &schema_with_b);
4600            rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
4601        };
4602
4603        // transform an arbitrary valid expression that we know is handled
4604        let known_expression = col("a").eq(lit(12));
4605        let known_expression_transformed = PredicateRewriter::new()
4606            .rewrite_predicate_to_statistics_predicate(
4607                &logical2physical(&known_expression, &schema),
4608                &schema,
4609            );
4610
4611        // an expression referencing an unknown column (that is not in the schema) gets passed to the hook
4612        let input = col("b").eq(lit(12));
4613        let expected = logical2physical(&lit(42), &schema);
4614        let transformed = transform_expr(input.clone());
4615        assert_eq!(transformed.to_string(), expected.to_string());
4616
4617        // more complex case with unknown column
4618        let input = known_expression.clone().and(input.clone());
4619        let expected = phys_expr::BinaryExpr::new(
4620            Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4621            Operator::And,
4622            logical2physical(&lit(42), &schema),
4623        );
4624        let transformed = transform_expr(input.clone());
4625        assert_eq!(transformed.to_string(), expected.to_string());
4626
4627        // an unknown expression gets passed to the hook
4628        let input = array_has(make_array(vec![lit(1)]), col("a"));
4629        let expected = logical2physical(&lit(42), &schema);
4630        let transformed = transform_expr(input.clone());
4631        assert_eq!(transformed.to_string(), expected.to_string());
4632
4633        // more complex case with unknown expression
4634        let input = known_expression.and(input);
4635        let expected = phys_expr::BinaryExpr::new(
4636            Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4637            Operator::And,
4638            logical2physical(&lit(42), &schema),
4639        );
4640        let transformed = transform_expr(input.clone());
4641        assert_eq!(transformed.to_string(), expected.to_string());
4642    }
4643
4644    #[test]
4645    fn test_rewrite_expr_to_prunable_error() {
4646        // cast string value to numeric value
4647        // this cast is not supported
4648        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
4649        let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4650        let left_input = cast(col("a"), DataType::Int64);
4651        let left_input = logical2physical(&left_input, &schema);
4652        let right_input = lit(ScalarValue::Int64(Some(12)));
4653        let right_input = logical2physical(&right_input, &schema);
4654        let result = rewrite_expr_to_prunable(
4655            &left_input,
4656            Operator::Gt,
4657            &right_input,
4658            df_schema.clone(),
4659        );
4660        assert!(result.is_err());
4661
4662        // other expr
4663        let left_input = is_null(col("a"));
4664        let left_input = logical2physical(&left_input, &schema);
4665        let right_input = lit(ScalarValue::Int64(Some(12)));
4666        let right_input = logical2physical(&right_input, &schema);
4667        let result =
4668            rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
4669        assert!(result.is_err());
4670        // TODO: add other negative test for other case and op
4671    }
4672
4673    #[test]
4674    fn prune_with_contained_one_column() {
4675        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4676
4677        // Model having information like a bloom filter for s1
4678        let statistics = TestStatistics::new()
4679            .with_contained(
4680                "s1",
4681                [ScalarValue::from("foo")],
4682                [
4683                    // container 0 known to only contain "foo"",
4684                    Some(true),
4685                    // container 1 known to not contain "foo"
4686                    Some(false),
4687                    // container 2 unknown about "foo"
4688                    None,
4689                    // container 3 known to only contain "foo"
4690                    Some(true),
4691                    // container 4 known to not contain "foo"
4692                    Some(false),
4693                    // container 5 unknown about "foo"
4694                    None,
4695                    // container 6 known to only contain "foo"
4696                    Some(true),
4697                    // container 7 known to not contain "foo"
4698                    Some(false),
4699                    // container 8 unknown about "foo"
4700                    None,
4701                ],
4702            )
4703            .with_contained(
4704                "s1",
4705                [ScalarValue::from("bar")],
4706                [
4707                    // containers 0,1,2 known to only contain "bar"
4708                    Some(true),
4709                    Some(true),
4710                    Some(true),
4711                    // container 3,4,5 known to not contain "bar"
4712                    Some(false),
4713                    Some(false),
4714                    Some(false),
4715                    // container 6,7,8 unknown about "bar"
4716                    None,
4717                    None,
4718                    None,
4719                ],
4720            )
4721            .with_contained(
4722                // the way the tests are setup, this data is
4723                // consulted if the "foo" and "bar" are being checked at the same time
4724                "s1",
4725                [ScalarValue::from("foo"), ScalarValue::from("bar")],
4726                [
4727                    // container 0,1,2 unknown about ("foo, "bar")
4728                    None,
4729                    None,
4730                    None,
4731                    // container 3,4,5 known to contain only either "foo" and "bar"
4732                    Some(true),
4733                    Some(true),
4734                    Some(true),
4735                    // container 6,7,8  known to contain  neither "foo" and "bar"
4736                    Some(false),
4737                    Some(false),
4738                    Some(false),
4739                ],
4740            );
4741
4742        // s1 = 'foo'
4743        prune_with_expr(
4744            col("s1").eq(lit("foo")),
4745            &schema,
4746            &statistics,
4747            // rule out containers ('false) where we know foo is not present
4748            &[true, false, true, true, false, true, true, false, true],
4749        );
4750
4751        // s1 = 'bar'
4752        prune_with_expr(
4753            col("s1").eq(lit("bar")),
4754            &schema,
4755            &statistics,
4756            // rule out containers where we know bar is not present
4757            &[true, true, true, false, false, false, true, true, true],
4758        );
4759
4760        // s1 = 'baz' (unknown value)
4761        prune_with_expr(
4762            col("s1").eq(lit("baz")),
4763            &schema,
4764            &statistics,
4765            // can't rule out anything
4766            &[true, true, true, true, true, true, true, true, true],
4767        );
4768
4769        // s1 = 'foo' AND s1 = 'bar'
4770        prune_with_expr(
4771            col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
4772            &schema,
4773            &statistics,
4774            // logically this predicate can't possibly be true (the column can't
4775            // take on both values) but we could rule it out if the stats tell
4776            // us that both values are not present
4777            &[true, true, true, true, true, true, true, true, true],
4778        );
4779
4780        // s1 = 'foo' OR s1 = 'bar'
4781        prune_with_expr(
4782            col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
4783            &schema,
4784            &statistics,
4785            // can rule out containers that we know contain neither foo nor bar
4786            &[true, true, true, true, true, true, false, false, false],
4787        );
4788
4789        // s1 = 'foo' OR s1 = 'baz'
4790        prune_with_expr(
4791            col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
4792            &schema,
4793            &statistics,
4794            // can't rule out anything container
4795            &[true, true, true, true, true, true, true, true, true],
4796        );
4797
4798        // s1 = 'foo' OR s1 = 'bar' OR s1 = 'baz'
4799        prune_with_expr(
4800            col("s1")
4801                .eq(lit("foo"))
4802                .or(col("s1").eq(lit("bar")))
4803                .or(col("s1").eq(lit("baz"))),
4804            &schema,
4805            &statistics,
4806            // can rule out any containers based on knowledge of s1 and `foo`,
4807            // `bar` and (`foo`, `bar`)
4808            &[true, true, true, true, true, true, true, true, true],
4809        );
4810
4811        // s1 != foo
4812        prune_with_expr(
4813            col("s1").not_eq(lit("foo")),
4814            &schema,
4815            &statistics,
4816            // rule out containers we know for sure only contain foo
4817            &[false, true, true, false, true, true, false, true, true],
4818        );
4819
4820        // s1 != bar
4821        prune_with_expr(
4822            col("s1").not_eq(lit("bar")),
4823            &schema,
4824            &statistics,
4825            // rule out when we know for sure s1 has the value bar
4826            &[false, false, false, true, true, true, true, true, true],
4827        );
4828
4829        // s1 != foo AND s1 != bar
4830        prune_with_expr(
4831            col("s1")
4832                .not_eq(lit("foo"))
4833                .and(col("s1").not_eq(lit("bar"))),
4834            &schema,
4835            &statistics,
4836            // can rule out any container where we know s1 does not have either 'foo' or 'bar'
4837            &[true, true, true, false, false, false, true, true, true],
4838        );
4839
4840        // s1 != foo AND s1 != bar AND s1 != baz
4841        prune_with_expr(
4842            col("s1")
4843                .not_eq(lit("foo"))
4844                .and(col("s1").not_eq(lit("bar")))
4845                .and(col("s1").not_eq(lit("baz"))),
4846            &schema,
4847            &statistics,
4848            // can't rule out any container based on  knowledge of s1,s2
4849            &[true, true, true, true, true, true, true, true, true],
4850        );
4851
4852        // s1 != foo OR s1 != bar
4853        prune_with_expr(
4854            col("s1")
4855                .not_eq(lit("foo"))
4856                .or(col("s1").not_eq(lit("bar"))),
4857            &schema,
4858            &statistics,
4859            // cant' rule out anything based on contains information
4860            &[true, true, true, true, true, true, true, true, true],
4861        );
4862
4863        // s1 != foo OR s1 != bar OR s1 != baz
4864        prune_with_expr(
4865            col("s1")
4866                .not_eq(lit("foo"))
4867                .or(col("s1").not_eq(lit("bar")))
4868                .or(col("s1").not_eq(lit("baz"))),
4869            &schema,
4870            &statistics,
4871            // cant' rule out anything based on contains information
4872            &[true, true, true, true, true, true, true, true, true],
4873        );
4874    }
4875
4876    #[test]
4877    fn prune_with_contained_two_columns() {
4878        let schema = Arc::new(Schema::new(vec![
4879            Field::new("s1", DataType::Utf8, true),
4880            Field::new("s2", DataType::Utf8, true),
4881        ]));
4882
4883        // Model having information like bloom filters for s1 and s2
4884        let statistics = TestStatistics::new()
4885            .with_contained(
4886                "s1",
4887                [ScalarValue::from("foo")],
4888                [
4889                    // container 0, s1 known to only contain "foo"",
4890                    Some(true),
4891                    // container 1, s1 known to not contain "foo"
4892                    Some(false),
4893                    // container 2, s1 unknown about "foo"
4894                    None,
4895                    // container 3, s1 known to only contain "foo"
4896                    Some(true),
4897                    // container 4, s1 known to not contain "foo"
4898                    Some(false),
4899                    // container 5, s1 unknown about "foo"
4900                    None,
4901                    // container 6, s1 known to only contain "foo"
4902                    Some(true),
4903                    // container 7, s1 known to not contain "foo"
4904                    Some(false),
4905                    // container 8, s1 unknown about "foo"
4906                    None,
4907                ],
4908            )
4909            .with_contained(
4910                "s2", // for column s2
4911                [ScalarValue::from("bar")],
4912                [
4913                    // containers 0,1,2 s2 known to only contain "bar"
4914                    Some(true),
4915                    Some(true),
4916                    Some(true),
4917                    // container 3,4,5 s2 known to not contain "bar"
4918                    Some(false),
4919                    Some(false),
4920                    Some(false),
4921                    // container 6,7,8 s2 unknown about "bar"
4922                    None,
4923                    None,
4924                    None,
4925                ],
4926            );
4927
4928        // s1 = 'foo'
4929        prune_with_expr(
4930            col("s1").eq(lit("foo")),
4931            &schema,
4932            &statistics,
4933            // rule out containers where we know s1 is not present
4934            &[true, false, true, true, false, true, true, false, true],
4935        );
4936
4937        // s1 = 'foo' OR s2 = 'bar'
4938        let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
4939        prune_with_expr(
4940            expr,
4941            &schema,
4942            &statistics,
4943            //  can't rule out any container (would need to prove that s1 != foo AND s2 != bar)
4944            &[true, true, true, true, true, true, true, true, true],
4945        );
4946
4947        // s1 = 'foo' AND s2 != 'bar'
4948        prune_with_expr(
4949            col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
4950            &schema,
4951            &statistics,
4952            // can only rule out container where we know either:
4953            // 1. s1 doesn't have the value 'foo` or
4954            // 2. s2 has only the value of 'bar'
4955            &[false, false, false, true, false, true, true, false, true],
4956        );
4957
4958        // s1 != 'foo' AND s2 != 'bar'
4959        prune_with_expr(
4960            col("s1")
4961                .not_eq(lit("foo"))
4962                .and(col("s2").not_eq(lit("bar"))),
4963            &schema,
4964            &statistics,
4965            // Can  rule out any container where we know either
4966            // 1. s1 has only the value 'foo'
4967            // 2. s2 has only the value 'bar'
4968            &[false, false, false, false, true, true, false, true, true],
4969        );
4970
4971        // s1 != 'foo' AND (s2 = 'bar' OR s2 = 'baz')
4972        prune_with_expr(
4973            col("s1")
4974                .not_eq(lit("foo"))
4975                .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
4976            &schema,
4977            &statistics,
4978            // Can rule out any container where we know s1 has only the value
4979            // 'foo'. Can't use knowledge of s2 and bar to rule out anything
4980            &[false, true, true, false, true, true, false, true, true],
4981        );
4982
4983        // s1 like '%foo%bar%'
4984        prune_with_expr(
4985            col("s1").like(lit("foo%bar%")),
4986            &schema,
4987            &statistics,
4988            // cant rule out anything with information we know
4989            &[true, true, true, true, true, true, true, true, true],
4990        );
4991
4992        // s1 like '%foo%bar%' AND s2 = 'bar'
4993        prune_with_expr(
4994            col("s1")
4995                .like(lit("foo%bar%"))
4996                .and(col("s2").eq(lit("bar"))),
4997            &schema,
4998            &statistics,
4999            // can rule out any container where we know s2 does not have the value 'bar'
5000            &[true, true, true, false, false, false, true, true, true],
5001        );
5002
5003        // s1 like '%foo%bar%' OR s2 = 'bar'
5004        prune_with_expr(
5005            col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
5006            &schema,
5007            &statistics,
5008            // can't rule out anything (we would have to prove that both the
5009            // like and the equality must be false)
5010            &[true, true, true, true, true, true, true, true, true],
5011        );
5012    }
5013
5014    #[test]
5015    fn prune_with_range_and_contained() {
5016        // Setup mimics range information for i, a bloom filter for s
5017        let schema = Arc::new(Schema::new(vec![
5018            Field::new("i", DataType::Int32, true),
5019            Field::new("s", DataType::Utf8, true),
5020        ]));
5021
5022        let statistics = TestStatistics::new()
5023            .with(
5024                "i",
5025                ContainerStats::new_i32(
5026                    // Container 0, 3, 6: [-5 to 5]
5027                    // Container 1, 4, 7: [10 to 20]
5028                    // Container 2, 5, 9: unknown
5029                    vec![
5030                        Some(-5),
5031                        Some(10),
5032                        None,
5033                        Some(-5),
5034                        Some(10),
5035                        None,
5036                        Some(-5),
5037                        Some(10),
5038                        None,
5039                    ], // min
5040                    vec![
5041                        Some(5),
5042                        Some(20),
5043                        None,
5044                        Some(5),
5045                        Some(20),
5046                        None,
5047                        Some(5),
5048                        Some(20),
5049                        None,
5050                    ], // max
5051                ),
5052            )
5053            // Add contained  information about the s and "foo"
5054            .with_contained(
5055                "s",
5056                [ScalarValue::from("foo")],
5057                [
5058                    // container 0,1,2 known to only contain "foo"
5059                    Some(true),
5060                    Some(true),
5061                    Some(true),
5062                    // container 3,4,5 known to not contain "foo"
5063                    Some(false),
5064                    Some(false),
5065                    Some(false),
5066                    // container 6,7,8 unknown about "foo"
5067                    None,
5068                    None,
5069                    None,
5070                ],
5071            );
5072
5073        // i = 0 and s = 'foo'
5074        prune_with_expr(
5075            col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
5076            &schema,
5077            &statistics,
5078            // Can rule out container where we know that either:
5079            // 1. 0 is outside the min/max range of i
5080            // 1. s does not contain foo
5081            // (range is false, and contained  is false)
5082            &[true, false, true, false, false, false, true, false, true],
5083        );
5084
5085        // i = 0 and s != 'foo'
5086        prune_with_expr(
5087            col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
5088            &schema,
5089            &statistics,
5090            // Can rule out containers where either:
5091            // 1. 0 is outside the min/max range of i
5092            // 2. s only contains foo
5093            &[false, false, false, true, false, true, true, false, true],
5094        );
5095
5096        // i = 0 OR s = 'foo'
5097        prune_with_expr(
5098            col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
5099            &schema,
5100            &statistics,
5101            // in theory could rule out containers if we had min/max values for
5102            // s as well. But in this case we don't so we can't rule out anything
5103            &[true, true, true, true, true, true, true, true, true],
5104        );
5105    }
5106
5107    /// prunes the specified expr with the specified schema and statistics, and
5108    /// ensures it returns expected.
5109    ///
5110    /// `expected` is a vector of bools, where true means the row group should
5111    /// be kept, and false means it should be pruned.
5112    // TODO refactor other tests to use this to reduce boiler plate
5113    fn prune_with_expr(
5114        expr: Expr,
5115        schema: &SchemaRef,
5116        statistics: &TestStatistics,
5117        expected: &[bool],
5118    ) {
5119        println!("Pruning with expr: {expr}");
5120        let expr = logical2physical(&expr, schema);
5121        let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5122        let result = p.prune(statistics).unwrap();
5123        assert_eq!(result, expected);
5124    }
5125
5126    fn test_build_predicate_expression(
5127        expr: &Expr,
5128        schema: &Schema,
5129        required_columns: &mut RequiredColumns,
5130    ) -> Arc<dyn PhysicalExpr> {
5131        let expr = logical2physical(expr, schema);
5132        let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
5133        build_predicate_expression(
5134            &expr,
5135            &Arc::new(schema.clone()),
5136            required_columns,
5137            &unhandled_hook,
5138        )
5139    }
5140
5141    #[test]
5142    fn test_build_predicate_expression_with_false() {
5143        let expr = lit(ScalarValue::Boolean(Some(false)));
5144        let schema = Schema::empty();
5145        let res =
5146            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5147        let expected = logical2physical(&expr, &schema);
5148        assert_eq!(&res, &expected);
5149    }
5150
5151    #[test]
5152    fn test_build_predicate_expression_with_and_false() {
5153        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5154        let expr = and(
5155            col("c1").eq(lit("a")),
5156            lit(ScalarValue::Boolean(Some(false))),
5157        );
5158        let res =
5159            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5160        let expected = logical2physical(&lit(ScalarValue::Boolean(Some(false))), &schema);
5161        assert_eq!(&res, &expected);
5162    }
5163
5164    #[test]
5165    fn test_build_predicate_expression_with_or_false() {
5166        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5167        let left_expr = col("c1").eq(lit("a"));
5168        let right_expr = lit(ScalarValue::Boolean(Some(false)));
5169        let res = test_build_predicate_expression(
5170            &or(left_expr.clone(), right_expr.clone()),
5171            &schema,
5172            &mut RequiredColumns::new(),
5173        );
5174        let expected =
5175            "c1_null_count@2 != row_count@3 AND c1_min@0 <= a AND a <= c1_max@1";
5176        assert_eq!(res.to_string(), expected);
5177    }
5178}