datafusion_sql/relation/
join.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
19use datafusion_common::{not_impl_err, plan_datafusion_err, Column, Result};
20use datafusion_expr::{JoinType, LogicalPlan, LogicalPlanBuilder};
21use sqlparser::ast::{
22    Join, JoinConstraint, JoinOperator, ObjectName, TableFactor, TableWithJoins,
23};
24use std::collections::HashSet;
25
26impl<S: ContextProvider> SqlToRel<'_, S> {
27    pub(crate) fn plan_table_with_joins(
28        &self,
29        t: TableWithJoins,
30        planner_context: &mut PlannerContext,
31    ) -> Result<LogicalPlan> {
32        let mut left = if is_lateral(&t.relation) {
33            self.create_relation_subquery(t.relation, planner_context)?
34        } else {
35            self.create_relation(t.relation, planner_context)?
36        };
37        let old_outer_from_schema = planner_context.outer_from_schema();
38        for join in t.joins {
39            planner_context.extend_outer_from_schema(left.schema())?;
40            left = self.parse_relation_join(left, join, planner_context)?;
41        }
42        planner_context.set_outer_from_schema(old_outer_from_schema);
43        Ok(left)
44    }
45
46    pub(crate) fn parse_relation_join(
47        &self,
48        left: LogicalPlan,
49        join: Join,
50        planner_context: &mut PlannerContext,
51    ) -> Result<LogicalPlan> {
52        let right = if is_lateral_join(&join)? {
53            self.create_relation_subquery(join.relation, planner_context)?
54        } else {
55            self.create_relation(join.relation, planner_context)?
56        };
57        match join.join_operator {
58            JoinOperator::LeftOuter(constraint) | JoinOperator::Left(constraint) => {
59                self.parse_join(left, right, constraint, JoinType::Left, planner_context)
60            }
61            JoinOperator::RightOuter(constraint) | JoinOperator::Right(constraint) => {
62                self.parse_join(left, right, constraint, JoinType::Right, planner_context)
63            }
64            JoinOperator::Inner(constraint) | JoinOperator::Join(constraint) => {
65                self.parse_join(left, right, constraint, JoinType::Inner, planner_context)
66            }
67            JoinOperator::LeftSemi(constraint) => self.parse_join(
68                left,
69                right,
70                constraint,
71                JoinType::LeftSemi,
72                planner_context,
73            ),
74            JoinOperator::RightSemi(constraint) => self.parse_join(
75                left,
76                right,
77                constraint,
78                JoinType::RightSemi,
79                planner_context,
80            ),
81            JoinOperator::LeftAnti(constraint) => self.parse_join(
82                left,
83                right,
84                constraint,
85                JoinType::LeftAnti,
86                planner_context,
87            ),
88            JoinOperator::RightAnti(constraint) => self.parse_join(
89                left,
90                right,
91                constraint,
92                JoinType::RightAnti,
93                planner_context,
94            ),
95            JoinOperator::FullOuter(constraint) => {
96                self.parse_join(left, right, constraint, JoinType::Full, planner_context)
97            }
98            JoinOperator::CrossJoin(JoinConstraint::None) => {
99                self.parse_cross_join(left, right)
100            }
101            other => not_impl_err!("Unsupported JOIN operator {other:?}"),
102        }
103    }
104
105    fn parse_cross_join(
106        &self,
107        left: LogicalPlan,
108        right: LogicalPlan,
109    ) -> Result<LogicalPlan> {
110        LogicalPlanBuilder::from(left).cross_join(right)?.build()
111    }
112
113    fn parse_join(
114        &self,
115        left: LogicalPlan,
116        right: LogicalPlan,
117        constraint: JoinConstraint,
118        join_type: JoinType,
119        planner_context: &mut PlannerContext,
120    ) -> Result<LogicalPlan> {
121        match constraint {
122            JoinConstraint::On(sql_expr) => {
123                let join_schema = left.schema().join(right.schema())?;
124                // parse ON expression
125                let expr = self.sql_to_expr(sql_expr, &join_schema, planner_context)?;
126                LogicalPlanBuilder::from(left)
127                    .join_on(right, join_type, Some(expr))?
128                    .build()
129            }
130            JoinConstraint::Using(object_names) => {
131                let keys = object_names
132                    .into_iter()
133                    .map(|object_name| {
134                        let ObjectName(mut object_names) = object_name;
135                        if object_names.len() != 1 {
136                            not_impl_err!(
137                                "Invalid identifier in USING clause. Expected single identifier, got {}", ObjectName(object_names)
138                            )
139                        } else {
140                            let id = object_names.swap_remove(0);
141                            id.as_ident()
142                                .ok_or_else(|| {
143                                    plan_datafusion_err!(
144                                        "Expected identifier in USING clause"
145                                    )
146                                })
147                                .map(|ident| Column::from_name(self.ident_normalizer.normalize(ident.clone())))
148                        }
149                    })
150                    .collect::<Result<Vec<_>>>()?;
151
152                LogicalPlanBuilder::from(left)
153                    .join_using(right, join_type, keys)?
154                    .build()
155            }
156            JoinConstraint::Natural => {
157                let left_cols: HashSet<&String> =
158                    left.schema().fields().iter().map(|f| f.name()).collect();
159                let keys: Vec<Column> = right
160                    .schema()
161                    .fields()
162                    .iter()
163                    .map(|f| f.name())
164                    .filter(|f| left_cols.contains(f))
165                    .map(Column::from_name)
166                    .collect();
167                if keys.is_empty() {
168                    self.parse_cross_join(left, right)
169                } else {
170                    LogicalPlanBuilder::from(left)
171                        .join_using(right, join_type, keys)?
172                        .build()
173                }
174            }
175            JoinConstraint::None => LogicalPlanBuilder::from(left)
176                .join_on(right, join_type, [])?
177                .build(),
178        }
179    }
180}
181
182/// Return `true` iff the given [`TableFactor`] is lateral.
183pub(crate) fn is_lateral(factor: &TableFactor) -> bool {
184    match factor {
185        TableFactor::Derived { lateral, .. } => *lateral,
186        TableFactor::Function { lateral, .. } => *lateral,
187        TableFactor::UNNEST { .. } => true,
188        _ => false,
189    }
190}
191
192/// Return `true` iff the given [`Join`] is lateral.
193pub(crate) fn is_lateral_join(join: &Join) -> Result<bool> {
194    let is_lateral_syntax = is_lateral(&join.relation);
195    let is_apply_syntax = match join.join_operator {
196        JoinOperator::FullOuter(..)
197        | JoinOperator::Right(..)
198        | JoinOperator::RightOuter(..)
199        | JoinOperator::RightAnti(..)
200        | JoinOperator::RightSemi(..)
201            if is_lateral_syntax =>
202        {
203            return not_impl_err!(
204                "LATERAL syntax is not supported for \
205                 FULL OUTER and RIGHT [OUTER | ANTI | SEMI] joins"
206            );
207        }
208        JoinOperator::CrossApply | JoinOperator::OuterApply => true,
209        _ => false,
210    };
211    Ok(is_lateral_syntax || is_apply_syntax)
212}