datafusion_physical_expr/expressions/
dynamic_filters.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use parking_lot::RwLock;
19use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
20
21use crate::PhysicalExpr;
22use arrow::datatypes::{DataType, Schema};
23use datafusion_common::{
24    tree_node::{Transformed, TransformedResult, TreeNode},
25    Result,
26};
27use datafusion_expr::ColumnarValue;
28use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};
29
30/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it.
31///
32/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
33/// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where
34/// the same `ExecutionPlan` is reused with different data.
35#[derive(Debug)]
36pub struct DynamicFilterPhysicalExpr {
37    /// The original children of this PhysicalExpr, if any.
38    /// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`)
39    /// and later remapped to the actual expressions that are being filtered.
40    /// But we need to know the children (e.g. columns referenced in the expression) ahead of time to evaluate the expression correctly.
41    children: Vec<Arc<dyn PhysicalExpr>>,
42    /// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children
43    /// so that when we update `current()` in subsequent iterations we can re-apply the replacements.
44    remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
45    /// The source of dynamic filters.
46    inner: Arc<RwLock<Inner>>,
47    /// For testing purposes track the data type and nullability to make sure they don't change.
48    /// If they do, there's a bug in the implementation.
49    /// But this can have overhead in production, so it's only included in our tests.
50    data_type: Arc<RwLock<Option<DataType>>>,
51    nullable: Arc<RwLock<Option<bool>>>,
52}
53
54#[derive(Debug)]
55struct Inner {
56    /// A counter that gets incremented every time the expression is updated so that we can track changes cheaply.
57    /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
58    generation: u64,
59    expr: Arc<dyn PhysicalExpr>,
60}
61
62impl Inner {
63    fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
64        Self {
65            // Start with generation 1 which gives us a different result for [`PhysicalExpr::generation`] than the default 0.
66            // This is not currently used anywhere but it seems useful to have this simple distinction.
67            generation: 1,
68            expr,
69        }
70    }
71
72    /// Clone the inner expression.
73    fn expr(&self) -> &Arc<dyn PhysicalExpr> {
74        &self.expr
75    }
76}
77
78impl Hash for DynamicFilterPhysicalExpr {
79    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
80        let inner = self.current().expect("Failed to get current expression");
81        inner.dyn_hash(state);
82        self.children.dyn_hash(state);
83        self.remapped_children.dyn_hash(state);
84    }
85}
86
87impl PartialEq for DynamicFilterPhysicalExpr {
88    fn eq(&self, other: &Self) -> bool {
89        let inner = self.current().expect("Failed to get current expression");
90        let our_children = self.remapped_children.as_ref().unwrap_or(&self.children);
91        let other_children = other.remapped_children.as_ref().unwrap_or(&other.children);
92        let other = other.current().expect("Failed to get current expression");
93        inner.dyn_eq(other.as_any()) && our_children == other_children
94    }
95}
96
97impl Eq for DynamicFilterPhysicalExpr {}
98
99impl Display for DynamicFilterPhysicalExpr {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        self.render(f, |expr, f| write!(f, "{expr}"))
102    }
103}
104
105impl DynamicFilterPhysicalExpr {
106    /// Create a new [`DynamicFilterPhysicalExpr`]
107    /// from an initial expression and a list of children.
108    /// The list of children is provided separately because
109    /// the initial expression may not have the same children.
110    /// For example, if the initial expression is just `true`
111    /// it will not reference any columns, but we may know that
112    /// we are going to replace this expression with a real one
113    /// that does reference certain columns.
114    /// In this case you **must** pass in the columns that will be
115    /// used in the final expression as children to this function
116    /// since DataFusion is generally not compatible with dynamic
117    /// *children* in expressions.
118    ///
119    /// To determine the children you can:
120    ///
121    /// - Use [`collect_columns`] to collect the columns from the expression.
122    /// - Use existing information, such as the sort columns in a `SortExec`.
123    ///
124    /// Generally the important bit is that the *leaf children that reference columns
125    /// do not change* since those will be used to determine what columns need to read or projected
126    /// when evaluating the expression.
127    ///
128    /// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
129    /// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where
130    /// the same `ExecutionPlan` is reused with different data.
131    ///
132    /// [`collect_columns`]: crate::utils::collect_columns
133    pub fn new(
134        children: Vec<Arc<dyn PhysicalExpr>>,
135        inner: Arc<dyn PhysicalExpr>,
136    ) -> Self {
137        Self {
138            children,
139            remapped_children: None, // Initially no remapped children
140            inner: Arc::new(RwLock::new(Inner::new(inner))),
141            data_type: Arc::new(RwLock::new(None)),
142            nullable: Arc::new(RwLock::new(None)),
143        }
144    }
145
146    fn remap_children(
147        children: &[Arc<dyn PhysicalExpr>],
148        remapped_children: Option<&Vec<Arc<dyn PhysicalExpr>>>,
149        expr: Arc<dyn PhysicalExpr>,
150    ) -> Result<Arc<dyn PhysicalExpr>> {
151        if let Some(remapped_children) = remapped_children {
152            // Remap the children to the new children
153            // of the expression.
154            expr.transform_up(|child| {
155                // Check if this is any of our original children
156                if let Some(pos) =
157                    children.iter().position(|c| c.as_ref() == child.as_ref())
158                {
159                    // If so, remap it to the current children
160                    // of the expression.
161                    let new_child = Arc::clone(&remapped_children[pos]);
162                    Ok(Transformed::yes(new_child))
163                } else {
164                    // Otherwise, just return the expression
165                    Ok(Transformed::no(child))
166                }
167            })
168            .data()
169        } else {
170            // If we don't have any remapped children, just return the expression
171            Ok(Arc::clone(&expr))
172        }
173    }
174
175    /// Get the current generation of the expression.
176    fn current_generation(&self) -> u64 {
177        self.inner.read().generation
178    }
179
180    /// Get the current expression.
181    /// This will return the current expression with any children
182    /// remapped to match calls to [`PhysicalExpr::with_new_children`].
183    pub fn current(&self) -> Result<Arc<dyn PhysicalExpr>> {
184        let expr = Arc::clone(self.inner.read().expr());
185        Self::remap_children(&self.children, self.remapped_children.as_ref(), expr)
186    }
187
188    /// Update the current expression.
189    /// Any children of this expression must be a subset of the original children
190    /// passed to the constructor.
191    /// This should be called e.g.:
192    /// - When we've computed the probe side's hash table in a HashJoinExec
193    /// - After every batch is processed if we update the TopK heap in a SortExec using a TopK approach.
194    pub fn update(&self, new_expr: Arc<dyn PhysicalExpr>) -> Result<()> {
195        // Remap the children of the new expression to match the original children
196        // We still do this again in `current()` but doing it preventively here
197        // reduces the work needed in some cases if `current()` is called multiple times
198        // and the same externally facing `PhysicalExpr` is used for both `with_new_children` and `update()`.`
199        let new_expr = Self::remap_children(
200            &self.children,
201            self.remapped_children.as_ref(),
202            new_expr,
203        )?;
204
205        // Load the current inner, increment generation, and store the new one
206        let mut current = self.inner.write();
207        *current = Inner {
208            generation: current.generation + 1,
209            expr: new_expr,
210        };
211        Ok(())
212    }
213
214    fn render(
215        &self,
216        f: &mut std::fmt::Formatter<'_>,
217        render_expr: impl FnOnce(
218            Arc<dyn PhysicalExpr>,
219            &mut std::fmt::Formatter<'_>,
220        ) -> std::fmt::Result,
221    ) -> std::fmt::Result {
222        let inner = self.current().map_err(|_| std::fmt::Error)?;
223        let current_generation = self.current_generation();
224        write!(f, "DynamicFilter [ ")?;
225        if current_generation == 1 {
226            write!(f, "empty")?;
227        } else {
228            render_expr(inner, f)?;
229        }
230
231        write!(f, " ]")
232    }
233}
234
235impl PhysicalExpr for DynamicFilterPhysicalExpr {
236    fn as_any(&self) -> &dyn Any {
237        self
238    }
239
240    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
241        self.remapped_children
242            .as_ref()
243            .unwrap_or(&self.children)
244            .iter()
245            .collect()
246    }
247
248    fn with_new_children(
249        self: Arc<Self>,
250        children: Vec<Arc<dyn PhysicalExpr>>,
251    ) -> Result<Arc<dyn PhysicalExpr>> {
252        Ok(Arc::new(Self {
253            children: self.children.clone(),
254            remapped_children: Some(children),
255            inner: Arc::clone(&self.inner),
256            data_type: Arc::clone(&self.data_type),
257            nullable: Arc::clone(&self.nullable),
258        }))
259    }
260
261    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
262        let res = self.current()?.data_type(input_schema)?;
263        #[cfg(test)]
264        {
265            use datafusion_common::internal_err;
266            // Check if the data type has changed.
267            let mut data_type_lock = self.data_type.write();
268
269            if let Some(existing) = &*data_type_lock {
270                if existing != &res {
271                    // If the data type has changed, we have a bug.
272                    return internal_err!(
273                        "DynamicFilterPhysicalExpr data type has changed unexpectedly. \
274                        Expected: {existing:?}, Actual: {res:?}"
275                    );
276                }
277            } else {
278                *data_type_lock = Some(res.clone());
279            }
280        }
281        Ok(res)
282    }
283
284    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
285        let res = self.current()?.nullable(input_schema)?;
286        #[cfg(test)]
287        {
288            use datafusion_common::internal_err;
289            // Check if the nullability has changed.
290            let mut nullable_lock = self.nullable.write();
291            if let Some(existing) = *nullable_lock {
292                if existing != res {
293                    // If the nullability has changed, we have a bug.
294                    return internal_err!(
295                        "DynamicFilterPhysicalExpr nullability has changed unexpectedly. \
296                        Expected: {existing}, Actual: {res}"
297                    );
298                }
299            } else {
300                *nullable_lock = Some(res);
301            }
302        }
303        Ok(res)
304    }
305
306    fn evaluate(
307        &self,
308        batch: &arrow::record_batch::RecordBatch,
309    ) -> Result<ColumnarValue> {
310        let current = self.current()?;
311        #[cfg(test)]
312        {
313            // Ensure that we are not evaluating after the expression has changed.
314            let schema = batch.schema();
315            self.nullable(&schema)?;
316            self.data_type(&schema)?;
317        };
318        current.evaluate(batch)
319    }
320
321    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322        self.render(f, |expr, f| expr.fmt_sql(f))
323    }
324
325    fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
326        // Return the current expression as a snapshot.
327        Ok(Some(self.current()?))
328    }
329
330    fn snapshot_generation(&self) -> u64 {
331        // Return the current generation of the expression.
332        self.inner.read().generation
333    }
334}
335
336#[cfg(test)]
337mod test {
338    use crate::{
339        expressions::{col, lit, BinaryExpr},
340        utils::reassign_expr_columns,
341    };
342    use arrow::{
343        array::RecordBatch,
344        datatypes::{DataType, Field, Schema},
345    };
346    use datafusion_common::ScalarValue;
347
348    use super::*;
349
350    #[test]
351    fn test_remap_children() {
352        let table_schema = Arc::new(Schema::new(vec![
353            Field::new("a", DataType::Int32, false),
354            Field::new("b", DataType::Int32, false),
355        ]));
356        let expr = Arc::new(BinaryExpr::new(
357            col("a", &table_schema).unwrap(),
358            datafusion_expr::Operator::Eq,
359            lit(42) as Arc<dyn PhysicalExpr>,
360        ));
361        let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
362            vec![col("a", &table_schema).unwrap()],
363            expr as Arc<dyn PhysicalExpr>,
364        ));
365        // Simulate two `ParquetSource` files with different filter schemas
366        // Both of these should hit the same inner `PhysicalExpr` even after `update()` is called
367        // and be able to remap children independently.
368        let filter_schema_1 = Arc::new(Schema::new(vec![
369            Field::new("a", DataType::Int32, false),
370            Field::new("b", DataType::Int32, false),
371        ]));
372        let filter_schema_2 = Arc::new(Schema::new(vec![
373            Field::new("b", DataType::Int32, false),
374            Field::new("a", DataType::Int32, false),
375        ]));
376        // Each ParquetExec calls `with_new_children` on the DynamicFilterPhysicalExpr
377        // and remaps the children to the file schema.
378        let dynamic_filter_1 = reassign_expr_columns(
379            Arc::clone(&dynamic_filter) as Arc<dyn PhysicalExpr>,
380            &filter_schema_1,
381        )
382        .unwrap();
383        let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
384        insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#);
385        let dynamic_filter_2 = reassign_expr_columns(
386            Arc::clone(&dynamic_filter) as Arc<dyn PhysicalExpr>,
387            &filter_schema_2,
388        )
389        .unwrap();
390        let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
391        insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#);
392        // Both filters allow evaluating the same expression
393        let batch_1 = RecordBatch::try_new(
394            Arc::clone(&filter_schema_1),
395            vec![
396                // a
397                ScalarValue::Int32(Some(42)).to_array_of_size(1).unwrap(),
398                // b
399                ScalarValue::Int32(Some(43)).to_array_of_size(1).unwrap(),
400            ],
401        )
402        .unwrap();
403        let batch_2 = RecordBatch::try_new(
404            Arc::clone(&filter_schema_2),
405            vec![
406                // b
407                ScalarValue::Int32(Some(43)).to_array_of_size(1).unwrap(),
408                // a
409                ScalarValue::Int32(Some(42)).to_array_of_size(1).unwrap(),
410            ],
411        )
412        .unwrap();
413        // Evaluate the expression on both batches
414        let result_1 = dynamic_filter_1.evaluate(&batch_1).unwrap();
415        let result_2 = dynamic_filter_2.evaluate(&batch_2).unwrap();
416        // Check that the results are the same
417        let ColumnarValue::Array(arr_1) = result_1 else {
418            panic!("Expected ColumnarValue::Array");
419        };
420        let ColumnarValue::Array(arr_2) = result_2 else {
421            panic!("Expected ColumnarValue::Array");
422        };
423        assert!(arr_1.eq(&arr_2));
424        let expected = ScalarValue::Boolean(Some(true))
425            .to_array_of_size(1)
426            .unwrap();
427        assert!(arr_1.eq(&expected));
428        // Now lets update the expression
429        // Note that we update the *original* expression and that should be reflected in both the derived expressions
430        let new_expr = Arc::new(BinaryExpr::new(
431            col("a", &table_schema).unwrap(),
432            datafusion_expr::Operator::Gt,
433            lit(43) as Arc<dyn PhysicalExpr>,
434        ));
435        dynamic_filter
436            .update(Arc::clone(&new_expr) as Arc<dyn PhysicalExpr>)
437            .expect("Failed to update expression");
438        // Now we should be able to evaluate the new expression on both batches
439        let result_1 = dynamic_filter_1.evaluate(&batch_1).unwrap();
440        let result_2 = dynamic_filter_2.evaluate(&batch_2).unwrap();
441        // Check that the results are the same
442        let ColumnarValue::Array(arr_1) = result_1 else {
443            panic!("Expected ColumnarValue::Array");
444        };
445        let ColumnarValue::Array(arr_2) = result_2 else {
446            panic!("Expected ColumnarValue::Array");
447        };
448        assert!(arr_1.eq(&arr_2));
449        let expected = ScalarValue::Boolean(Some(false))
450            .to_array_of_size(1)
451            .unwrap();
452        assert!(arr_1.eq(&expected));
453    }
454
455    #[test]
456    fn test_snapshot() {
457        let expr = lit(42) as Arc<dyn PhysicalExpr>;
458        let dynamic_filter = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr));
459
460        // Take a snapshot of the current expression
461        let snapshot = dynamic_filter.snapshot().unwrap();
462        assert_eq!(snapshot, Some(expr));
463
464        // Update the current expression
465        let new_expr = lit(100) as Arc<dyn PhysicalExpr>;
466        dynamic_filter.update(Arc::clone(&new_expr)).unwrap();
467        // Take another snapshot
468        let snapshot = dynamic_filter.snapshot().unwrap();
469        assert_eq!(snapshot, Some(new_expr));
470    }
471
472    #[test]
473    fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() {
474        let dynamic_filter =
475            DynamicFilterPhysicalExpr::new(vec![], lit(42) as Arc<dyn PhysicalExpr>);
476
477        // First call to data_type and nullable should set the initial values.
478        let initial_data_type = dynamic_filter.data_type(&Schema::empty()).unwrap();
479        let initial_nullable = dynamic_filter.nullable(&Schema::empty()).unwrap();
480
481        // Call again and expect no change.
482        let second_data_type = dynamic_filter.data_type(&Schema::empty()).unwrap();
483        let second_nullable = dynamic_filter.nullable(&Schema::empty()).unwrap();
484        assert_eq!(
485            initial_data_type, second_data_type,
486            "Data type should not change on second call."
487        );
488        assert_eq!(
489            initial_nullable, second_nullable,
490            "Nullability should not change on second call."
491        );
492
493        // Now change the current expression to something else.
494        dynamic_filter
495            .update(lit(ScalarValue::Utf8(None)) as Arc<dyn PhysicalExpr>)
496            .expect("Failed to update expression");
497        // Check that we error if we call data_type, nullable or evaluate after changing the expression.
498        assert!(
499            dynamic_filter.data_type(&Schema::empty()).is_err(),
500            "Expected err when data_type is called after changing the expression."
501        );
502        assert!(
503            dynamic_filter.nullable(&Schema::empty()).is_err(),
504            "Expected err when nullable is called after changing the expression."
505        );
506        let batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
507        assert!(
508            dynamic_filter.evaluate(&batch).is_err(),
509            "Expected err when evaluate is called after changing the expression."
510        );
511    }
512}