datafusion_physical_plan/joins/
join_filter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::joins::utils::ColumnIndex;
19use arrow::datatypes::SchemaRef;
20use datafusion_common::JoinSide;
21use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
22use std::{fmt::Display, sync::Arc};
23
24/// Filter applied before join output. Fields are crate-public to allow
25/// downstream implementations to experiment with custom joins.
26#[derive(Debug, Clone)]
27pub struct JoinFilter {
28    /// Filter expression
29    pub(crate) expression: Arc<dyn PhysicalExpr>,
30    /// Column indices required to construct intermediate batch for filtering
31    pub(crate) column_indices: Vec<ColumnIndex>,
32    /// Physical schema of intermediate batch
33    pub(crate) schema: SchemaRef,
34}
35
36/// For display in `EXPLAIN` plans, only expression with column names is needed,
37/// it output expression like `(col1 + col2) = 0`
38impl Display for JoinFilter {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        self.expression.fmt_sql(f)
41    }
42}
43
44impl JoinFilter {
45    /// Creates new JoinFilter
46    pub fn new(
47        expression: Arc<dyn PhysicalExpr>,
48        column_indices: Vec<ColumnIndex>,
49        schema: SchemaRef,
50    ) -> JoinFilter {
51        JoinFilter {
52            expression,
53            column_indices,
54            schema,
55        }
56    }
57
58    /// Helper for building ColumnIndex vector from left and right indices
59    pub fn build_column_indices(
60        left_indices: Vec<usize>,
61        right_indices: Vec<usize>,
62    ) -> Vec<ColumnIndex> {
63        left_indices
64            .into_iter()
65            .map(|i| ColumnIndex {
66                index: i,
67                side: JoinSide::Left,
68            })
69            .chain(right_indices.into_iter().map(|i| ColumnIndex {
70                index: i,
71                side: JoinSide::Right,
72            }))
73            .collect()
74    }
75
76    /// Filter expression
77    pub fn expression(&self) -> &Arc<dyn PhysicalExpr> {
78        &self.expression
79    }
80
81    /// Column indices for intermediate batch creation
82    pub fn column_indices(&self) -> &[ColumnIndex] {
83        &self.column_indices
84    }
85
86    /// Intermediate batch schema
87    pub fn schema(&self) -> &SchemaRef {
88        &self.schema
89    }
90
91    /// Rewrites the join filter if the inputs to the join are rewritten
92    pub fn swap(&self) -> JoinFilter {
93        let column_indices = self
94            .column_indices()
95            .iter()
96            .map(|idx| ColumnIndex {
97                index: idx.index,
98                side: idx.side.negate(),
99            })
100            .collect();
101
102        JoinFilter::new(
103            Arc::clone(self.expression()),
104            column_indices,
105            Arc::clone(self.schema()),
106        )
107    }
108}