1use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
19use datafusion_common::{not_impl_err, plan_datafusion_err, Column, Result};
20use datafusion_expr::{JoinType, LogicalPlan, LogicalPlanBuilder};
21use sqlparser::ast::{
22 Join, JoinConstraint, JoinOperator, ObjectName, TableFactor, TableWithJoins,
23};
24use std::collections::HashSet;
25
26impl<S: ContextProvider> SqlToRel<'_, S> {
27 pub(crate) fn plan_table_with_joins(
28 &self,
29 t: TableWithJoins,
30 planner_context: &mut PlannerContext,
31 ) -> Result<LogicalPlan> {
32 let mut left = if is_lateral(&t.relation) {
33 self.create_relation_subquery(t.relation, planner_context)?
34 } else {
35 self.create_relation(t.relation, planner_context)?
36 };
37 let old_outer_from_schema = planner_context.outer_from_schema();
38 for join in t.joins {
39 planner_context.extend_outer_from_schema(left.schema())?;
40 left = self.parse_relation_join(left, join, planner_context)?;
41 }
42 planner_context.set_outer_from_schema(old_outer_from_schema);
43 Ok(left)
44 }
45
46 pub(crate) fn parse_relation_join(
47 &self,
48 left: LogicalPlan,
49 join: Join,
50 planner_context: &mut PlannerContext,
51 ) -> Result<LogicalPlan> {
52 let right = if is_lateral_join(&join)? {
53 self.create_relation_subquery(join.relation, planner_context)?
54 } else {
55 self.create_relation(join.relation, planner_context)?
56 };
57 match join.join_operator {
58 JoinOperator::LeftOuter(constraint) | JoinOperator::Left(constraint) => {
59 self.parse_join(left, right, constraint, JoinType::Left, planner_context)
60 }
61 JoinOperator::RightOuter(constraint) | JoinOperator::Right(constraint) => {
62 self.parse_join(left, right, constraint, JoinType::Right, planner_context)
63 }
64 JoinOperator::Inner(constraint) | JoinOperator::Join(constraint) => {
65 self.parse_join(left, right, constraint, JoinType::Inner, planner_context)
66 }
67 JoinOperator::LeftSemi(constraint) => self.parse_join(
68 left,
69 right,
70 constraint,
71 JoinType::LeftSemi,
72 planner_context,
73 ),
74 JoinOperator::RightSemi(constraint) => self.parse_join(
75 left,
76 right,
77 constraint,
78 JoinType::RightSemi,
79 planner_context,
80 ),
81 JoinOperator::LeftAnti(constraint) => self.parse_join(
82 left,
83 right,
84 constraint,
85 JoinType::LeftAnti,
86 planner_context,
87 ),
88 JoinOperator::RightAnti(constraint) => self.parse_join(
89 left,
90 right,
91 constraint,
92 JoinType::RightAnti,
93 planner_context,
94 ),
95 JoinOperator::FullOuter(constraint) => {
96 self.parse_join(left, right, constraint, JoinType::Full, planner_context)
97 }
98 JoinOperator::CrossJoin(JoinConstraint::None) => {
99 self.parse_cross_join(left, right)
100 }
101 other => not_impl_err!("Unsupported JOIN operator {other:?}"),
102 }
103 }
104
105 fn parse_cross_join(
106 &self,
107 left: LogicalPlan,
108 right: LogicalPlan,
109 ) -> Result<LogicalPlan> {
110 LogicalPlanBuilder::from(left).cross_join(right)?.build()
111 }
112
113 fn parse_join(
114 &self,
115 left: LogicalPlan,
116 right: LogicalPlan,
117 constraint: JoinConstraint,
118 join_type: JoinType,
119 planner_context: &mut PlannerContext,
120 ) -> Result<LogicalPlan> {
121 match constraint {
122 JoinConstraint::On(sql_expr) => {
123 let join_schema = left.schema().join(right.schema())?;
124 let expr = self.sql_to_expr(sql_expr, &join_schema, planner_context)?;
126 LogicalPlanBuilder::from(left)
127 .join_on(right, join_type, Some(expr))?
128 .build()
129 }
130 JoinConstraint::Using(object_names) => {
131 let keys = object_names
132 .into_iter()
133 .map(|object_name| {
134 let ObjectName(mut object_names) = object_name;
135 if object_names.len() != 1 {
136 not_impl_err!(
137 "Invalid identifier in USING clause. Expected single identifier, got {}", ObjectName(object_names)
138 )
139 } else {
140 let id = object_names.swap_remove(0);
141 id.as_ident()
142 .ok_or_else(|| {
143 plan_datafusion_err!(
144 "Expected identifier in USING clause"
145 )
146 })
147 .map(|ident| Column::from_name(self.ident_normalizer.normalize(ident.clone())))
148 }
149 })
150 .collect::<Result<Vec<_>>>()?;
151
152 LogicalPlanBuilder::from(left)
153 .join_using(right, join_type, keys)?
154 .build()
155 }
156 JoinConstraint::Natural => {
157 let left_cols: HashSet<&String> =
158 left.schema().fields().iter().map(|f| f.name()).collect();
159 let keys: Vec<Column> = right
160 .schema()
161 .fields()
162 .iter()
163 .map(|f| f.name())
164 .filter(|f| left_cols.contains(f))
165 .map(Column::from_name)
166 .collect();
167 if keys.is_empty() {
168 self.parse_cross_join(left, right)
169 } else {
170 LogicalPlanBuilder::from(left)
171 .join_using(right, join_type, keys)?
172 .build()
173 }
174 }
175 JoinConstraint::None => LogicalPlanBuilder::from(left)
176 .join_on(right, join_type, [])?
177 .build(),
178 }
179 }
180}
181
182pub(crate) fn is_lateral(factor: &TableFactor) -> bool {
184 match factor {
185 TableFactor::Derived { lateral, .. } => *lateral,
186 TableFactor::Function { lateral, .. } => *lateral,
187 TableFactor::UNNEST { .. } => true,
188 _ => false,
189 }
190}
191
192pub(crate) fn is_lateral_join(join: &Join) -> Result<bool> {
194 let is_lateral_syntax = is_lateral(&join.relation);
195 let is_apply_syntax = match join.join_operator {
196 JoinOperator::FullOuter(..)
197 | JoinOperator::Right(..)
198 | JoinOperator::RightOuter(..)
199 | JoinOperator::RightAnti(..)
200 | JoinOperator::RightSemi(..)
201 if is_lateral_syntax =>
202 {
203 return not_impl_err!(
204 "LATERAL syntax is not supported for \
205 FULL OUTER and RIGHT [OUTER | ANTI | SEMI] joins"
206 );
207 }
208 JoinOperator::CrossApply | JoinOperator::OuterApply => true,
209 _ => false,
210 };
211 Ok(is_lateral_syntax || is_apply_syntax)
212}