datafusion_physical_expr/expressions/
dynamic_filters.rs1use parking_lot::RwLock;
19use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
20
21use crate::PhysicalExpr;
22use arrow::datatypes::{DataType, Schema};
23use datafusion_common::{
24 tree_node::{Transformed, TransformedResult, TreeNode},
25 Result,
26};
27use datafusion_expr::ColumnarValue;
28use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};
29
30#[derive(Debug)]
36pub struct DynamicFilterPhysicalExpr {
37 children: Vec<Arc<dyn PhysicalExpr>>,
42 remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
45 inner: Arc<RwLock<Inner>>,
47 data_type: Arc<RwLock<Option<DataType>>>,
51 nullable: Arc<RwLock<Option<bool>>>,
52}
53
54#[derive(Debug)]
55struct Inner {
56 generation: u64,
59 expr: Arc<dyn PhysicalExpr>,
60}
61
62impl Inner {
63 fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
64 Self {
65 generation: 1,
68 expr,
69 }
70 }
71
72 fn expr(&self) -> &Arc<dyn PhysicalExpr> {
74 &self.expr
75 }
76}
77
78impl Hash for DynamicFilterPhysicalExpr {
79 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
80 let inner = self.current().expect("Failed to get current expression");
81 inner.dyn_hash(state);
82 self.children.dyn_hash(state);
83 self.remapped_children.dyn_hash(state);
84 }
85}
86
87impl PartialEq for DynamicFilterPhysicalExpr {
88 fn eq(&self, other: &Self) -> bool {
89 let inner = self.current().expect("Failed to get current expression");
90 let our_children = self.remapped_children.as_ref().unwrap_or(&self.children);
91 let other_children = other.remapped_children.as_ref().unwrap_or(&other.children);
92 let other = other.current().expect("Failed to get current expression");
93 inner.dyn_eq(other.as_any()) && our_children == other_children
94 }
95}
96
97impl Eq for DynamicFilterPhysicalExpr {}
98
99impl Display for DynamicFilterPhysicalExpr {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 self.render(f, |expr, f| write!(f, "{expr}"))
102 }
103}
104
105impl DynamicFilterPhysicalExpr {
106 pub fn new(
134 children: Vec<Arc<dyn PhysicalExpr>>,
135 inner: Arc<dyn PhysicalExpr>,
136 ) -> Self {
137 Self {
138 children,
139 remapped_children: None, inner: Arc::new(RwLock::new(Inner::new(inner))),
141 data_type: Arc::new(RwLock::new(None)),
142 nullable: Arc::new(RwLock::new(None)),
143 }
144 }
145
146 fn remap_children(
147 children: &[Arc<dyn PhysicalExpr>],
148 remapped_children: Option<&Vec<Arc<dyn PhysicalExpr>>>,
149 expr: Arc<dyn PhysicalExpr>,
150 ) -> Result<Arc<dyn PhysicalExpr>> {
151 if let Some(remapped_children) = remapped_children {
152 expr.transform_up(|child| {
155 if let Some(pos) =
157 children.iter().position(|c| c.as_ref() == child.as_ref())
158 {
159 let new_child = Arc::clone(&remapped_children[pos]);
162 Ok(Transformed::yes(new_child))
163 } else {
164 Ok(Transformed::no(child))
166 }
167 })
168 .data()
169 } else {
170 Ok(Arc::clone(&expr))
172 }
173 }
174
175 fn current_generation(&self) -> u64 {
177 self.inner.read().generation
178 }
179
180 pub fn current(&self) -> Result<Arc<dyn PhysicalExpr>> {
184 let expr = Arc::clone(self.inner.read().expr());
185 Self::remap_children(&self.children, self.remapped_children.as_ref(), expr)
186 }
187
188 pub fn update(&self, new_expr: Arc<dyn PhysicalExpr>) -> Result<()> {
195 let new_expr = Self::remap_children(
200 &self.children,
201 self.remapped_children.as_ref(),
202 new_expr,
203 )?;
204
205 let mut current = self.inner.write();
207 *current = Inner {
208 generation: current.generation + 1,
209 expr: new_expr,
210 };
211 Ok(())
212 }
213
214 fn render(
215 &self,
216 f: &mut std::fmt::Formatter<'_>,
217 render_expr: impl FnOnce(
218 Arc<dyn PhysicalExpr>,
219 &mut std::fmt::Formatter<'_>,
220 ) -> std::fmt::Result,
221 ) -> std::fmt::Result {
222 let inner = self.current().map_err(|_| std::fmt::Error)?;
223 let current_generation = self.current_generation();
224 write!(f, "DynamicFilter [ ")?;
225 if current_generation == 1 {
226 write!(f, "empty")?;
227 } else {
228 render_expr(inner, f)?;
229 }
230
231 write!(f, " ]")
232 }
233}
234
235impl PhysicalExpr for DynamicFilterPhysicalExpr {
236 fn as_any(&self) -> &dyn Any {
237 self
238 }
239
240 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
241 self.remapped_children
242 .as_ref()
243 .unwrap_or(&self.children)
244 .iter()
245 .collect()
246 }
247
248 fn with_new_children(
249 self: Arc<Self>,
250 children: Vec<Arc<dyn PhysicalExpr>>,
251 ) -> Result<Arc<dyn PhysicalExpr>> {
252 Ok(Arc::new(Self {
253 children: self.children.clone(),
254 remapped_children: Some(children),
255 inner: Arc::clone(&self.inner),
256 data_type: Arc::clone(&self.data_type),
257 nullable: Arc::clone(&self.nullable),
258 }))
259 }
260
261 fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
262 let res = self.current()?.data_type(input_schema)?;
263 #[cfg(test)]
264 {
265 use datafusion_common::internal_err;
266 let mut data_type_lock = self.data_type.write();
268
269 if let Some(existing) = &*data_type_lock {
270 if existing != &res {
271 return internal_err!(
273 "DynamicFilterPhysicalExpr data type has changed unexpectedly. \
274 Expected: {existing:?}, Actual: {res:?}"
275 );
276 }
277 } else {
278 *data_type_lock = Some(res.clone());
279 }
280 }
281 Ok(res)
282 }
283
284 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
285 let res = self.current()?.nullable(input_schema)?;
286 #[cfg(test)]
287 {
288 use datafusion_common::internal_err;
289 let mut nullable_lock = self.nullable.write();
291 if let Some(existing) = *nullable_lock {
292 if existing != res {
293 return internal_err!(
295 "DynamicFilterPhysicalExpr nullability has changed unexpectedly. \
296 Expected: {existing}, Actual: {res}"
297 );
298 }
299 } else {
300 *nullable_lock = Some(res);
301 }
302 }
303 Ok(res)
304 }
305
306 fn evaluate(
307 &self,
308 batch: &arrow::record_batch::RecordBatch,
309 ) -> Result<ColumnarValue> {
310 let current = self.current()?;
311 #[cfg(test)]
312 {
313 let schema = batch.schema();
315 self.nullable(&schema)?;
316 self.data_type(&schema)?;
317 };
318 current.evaluate(batch)
319 }
320
321 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322 self.render(f, |expr, f| expr.fmt_sql(f))
323 }
324
325 fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
326 Ok(Some(self.current()?))
328 }
329
330 fn snapshot_generation(&self) -> u64 {
331 self.inner.read().generation
333 }
334}
335
336#[cfg(test)]
337mod test {
338 use crate::{
339 expressions::{col, lit, BinaryExpr},
340 utils::reassign_expr_columns,
341 };
342 use arrow::{
343 array::RecordBatch,
344 datatypes::{DataType, Field, Schema},
345 };
346 use datafusion_common::ScalarValue;
347
348 use super::*;
349
350 #[test]
351 fn test_remap_children() {
352 let table_schema = Arc::new(Schema::new(vec![
353 Field::new("a", DataType::Int32, false),
354 Field::new("b", DataType::Int32, false),
355 ]));
356 let expr = Arc::new(BinaryExpr::new(
357 col("a", &table_schema).unwrap(),
358 datafusion_expr::Operator::Eq,
359 lit(42) as Arc<dyn PhysicalExpr>,
360 ));
361 let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
362 vec![col("a", &table_schema).unwrap()],
363 expr as Arc<dyn PhysicalExpr>,
364 ));
365 let filter_schema_1 = Arc::new(Schema::new(vec![
369 Field::new("a", DataType::Int32, false),
370 Field::new("b", DataType::Int32, false),
371 ]));
372 let filter_schema_2 = Arc::new(Schema::new(vec![
373 Field::new("b", DataType::Int32, false),
374 Field::new("a", DataType::Int32, false),
375 ]));
376 let dynamic_filter_1 = reassign_expr_columns(
379 Arc::clone(&dynamic_filter) as Arc<dyn PhysicalExpr>,
380 &filter_schema_1,
381 )
382 .unwrap();
383 let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
384 insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#);
385 let dynamic_filter_2 = reassign_expr_columns(
386 Arc::clone(&dynamic_filter) as Arc<dyn PhysicalExpr>,
387 &filter_schema_2,
388 )
389 .unwrap();
390 let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
391 insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#);
392 let batch_1 = RecordBatch::try_new(
394 Arc::clone(&filter_schema_1),
395 vec![
396 ScalarValue::Int32(Some(42)).to_array_of_size(1).unwrap(),
398 ScalarValue::Int32(Some(43)).to_array_of_size(1).unwrap(),
400 ],
401 )
402 .unwrap();
403 let batch_2 = RecordBatch::try_new(
404 Arc::clone(&filter_schema_2),
405 vec![
406 ScalarValue::Int32(Some(43)).to_array_of_size(1).unwrap(),
408 ScalarValue::Int32(Some(42)).to_array_of_size(1).unwrap(),
410 ],
411 )
412 .unwrap();
413 let result_1 = dynamic_filter_1.evaluate(&batch_1).unwrap();
415 let result_2 = dynamic_filter_2.evaluate(&batch_2).unwrap();
416 let ColumnarValue::Array(arr_1) = result_1 else {
418 panic!("Expected ColumnarValue::Array");
419 };
420 let ColumnarValue::Array(arr_2) = result_2 else {
421 panic!("Expected ColumnarValue::Array");
422 };
423 assert!(arr_1.eq(&arr_2));
424 let expected = ScalarValue::Boolean(Some(true))
425 .to_array_of_size(1)
426 .unwrap();
427 assert!(arr_1.eq(&expected));
428 let new_expr = Arc::new(BinaryExpr::new(
431 col("a", &table_schema).unwrap(),
432 datafusion_expr::Operator::Gt,
433 lit(43) as Arc<dyn PhysicalExpr>,
434 ));
435 dynamic_filter
436 .update(Arc::clone(&new_expr) as Arc<dyn PhysicalExpr>)
437 .expect("Failed to update expression");
438 let result_1 = dynamic_filter_1.evaluate(&batch_1).unwrap();
440 let result_2 = dynamic_filter_2.evaluate(&batch_2).unwrap();
441 let ColumnarValue::Array(arr_1) = result_1 else {
443 panic!("Expected ColumnarValue::Array");
444 };
445 let ColumnarValue::Array(arr_2) = result_2 else {
446 panic!("Expected ColumnarValue::Array");
447 };
448 assert!(arr_1.eq(&arr_2));
449 let expected = ScalarValue::Boolean(Some(false))
450 .to_array_of_size(1)
451 .unwrap();
452 assert!(arr_1.eq(&expected));
453 }
454
455 #[test]
456 fn test_snapshot() {
457 let expr = lit(42) as Arc<dyn PhysicalExpr>;
458 let dynamic_filter = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr));
459
460 let snapshot = dynamic_filter.snapshot().unwrap();
462 assert_eq!(snapshot, Some(expr));
463
464 let new_expr = lit(100) as Arc<dyn PhysicalExpr>;
466 dynamic_filter.update(Arc::clone(&new_expr)).unwrap();
467 let snapshot = dynamic_filter.snapshot().unwrap();
469 assert_eq!(snapshot, Some(new_expr));
470 }
471
472 #[test]
473 fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() {
474 let dynamic_filter =
475 DynamicFilterPhysicalExpr::new(vec![], lit(42) as Arc<dyn PhysicalExpr>);
476
477 let initial_data_type = dynamic_filter.data_type(&Schema::empty()).unwrap();
479 let initial_nullable = dynamic_filter.nullable(&Schema::empty()).unwrap();
480
481 let second_data_type = dynamic_filter.data_type(&Schema::empty()).unwrap();
483 let second_nullable = dynamic_filter.nullable(&Schema::empty()).unwrap();
484 assert_eq!(
485 initial_data_type, second_data_type,
486 "Data type should not change on second call."
487 );
488 assert_eq!(
489 initial_nullable, second_nullable,
490 "Nullability should not change on second call."
491 );
492
493 dynamic_filter
495 .update(lit(ScalarValue::Utf8(None)) as Arc<dyn PhysicalExpr>)
496 .expect("Failed to update expression");
497 assert!(
499 dynamic_filter.data_type(&Schema::empty()).is_err(),
500 "Expected err when data_type is called after changing the expression."
501 );
502 assert!(
503 dynamic_filter.nullable(&Schema::empty()).is_err(),
504 "Expected err when nullable is called after changing the expression."
505 );
506 let batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
507 assert!(
508 dynamic_filter.evaluate(&batch).is_err(),
509 "Expected err when evaluate is called after changing the expression."
510 );
511 }
512}