datafusion_physical_plan/joins/join_filter.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::joins::utils::ColumnIndex;
19use arrow::datatypes::SchemaRef;
20use datafusion_common::JoinSide;
21use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
22use std::{fmt::Display, sync::Arc};
23
24/// Filter applied before join output. Fields are crate-public to allow
25/// downstream implementations to experiment with custom joins.
26#[derive(Debug, Clone)]
27pub struct JoinFilter {
28 /// Filter expression
29 pub(crate) expression: Arc<dyn PhysicalExpr>,
30 /// Column indices required to construct intermediate batch for filtering
31 pub(crate) column_indices: Vec<ColumnIndex>,
32 /// Physical schema of intermediate batch
33 pub(crate) schema: SchemaRef,
34}
35
36/// For display in `EXPLAIN` plans, only expression with column names is needed,
37/// it output expression like `(col1 + col2) = 0`
38impl Display for JoinFilter {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 self.expression.fmt_sql(f)
41 }
42}
43
44impl JoinFilter {
45 /// Creates new JoinFilter
46 pub fn new(
47 expression: Arc<dyn PhysicalExpr>,
48 column_indices: Vec<ColumnIndex>,
49 schema: SchemaRef,
50 ) -> JoinFilter {
51 JoinFilter {
52 expression,
53 column_indices,
54 schema,
55 }
56 }
57
58 /// Helper for building ColumnIndex vector from left and right indices
59 pub fn build_column_indices(
60 left_indices: Vec<usize>,
61 right_indices: Vec<usize>,
62 ) -> Vec<ColumnIndex> {
63 left_indices
64 .into_iter()
65 .map(|i| ColumnIndex {
66 index: i,
67 side: JoinSide::Left,
68 })
69 .chain(right_indices.into_iter().map(|i| ColumnIndex {
70 index: i,
71 side: JoinSide::Right,
72 }))
73 .collect()
74 }
75
76 /// Filter expression
77 pub fn expression(&self) -> &Arc<dyn PhysicalExpr> {
78 &self.expression
79 }
80
81 /// Column indices for intermediate batch creation
82 pub fn column_indices(&self) -> &[ColumnIndex] {
83 &self.column_indices
84 }
85
86 /// Intermediate batch schema
87 pub fn schema(&self) -> &SchemaRef {
88 &self.schema
89 }
90
91 /// Rewrites the join filter if the inputs to the join are rewritten
92 pub fn swap(&self) -> JoinFilter {
93 let column_indices = self
94 .column_indices()
95 .iter()
96 .map(|idx| ColumnIndex {
97 index: idx.index,
98 side: idx.side.negate(),
99 })
100 .collect();
101
102 JoinFilter::new(
103 Arc::clone(self.expression()),
104 column_indices,
105 Arc::clone(self.schema()),
106 )
107 }
108}