1use arrow::datatypes::Field;
19use datafusion_common::{
20 exec_datafusion_err, internal_err, not_impl_err, plan_datafusion_err, plan_err,
21 Column, DFSchema, Result, Span, TableReference,
22};
23use datafusion_expr::planner::PlannerResult;
24use datafusion_expr::{Case, Expr};
25use sqlparser::ast::{CaseWhen, Expr as SQLExpr, Ident};
26use std::sync::Arc;
27
28use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
29use datafusion_expr::UNNAMED_TABLE;
30
31impl<S: ContextProvider> SqlToRel<'_, S> {
32 pub(super) fn sql_identifier_to_expr(
33 &self,
34 id: Ident,
35 schema: &DFSchema,
36 planner_context: &mut PlannerContext,
37 ) -> Result<Expr> {
38 let id_span = id.span;
39 if id.value.starts_with('@') {
40 let var_names = vec![id.value];
42 let ty = self
43 .context_provider
44 .get_variable_type(&var_names)
45 .ok_or_else(|| {
46 plan_datafusion_err!("variable {var_names:?} has no type information")
47 })?;
48 Ok(Expr::ScalarVariable(ty, var_names))
49 } else {
50 let normalize_ident = self.ident_normalizer.normalize(id);
55
56 if planner_context
57 .lambdas_parameters()
58 .contains(&normalize_ident)
59 {
60 let mut column = Column::new_unqualified(normalize_ident);
61 if self.options.collect_spans {
62 if let Some(span) = Span::try_from_sqlparser_span(id_span) {
63 column.spans_mut().add_span(span);
64 }
65 }
66 return Ok(Expr::Column(column));
67 }
68
69 if let Ok((qualifier, _)) =
71 schema.qualified_field_with_unqualified_name(normalize_ident.as_str())
72 {
73 let mut column = Column::new(
74 qualifier.filter(|q| q.table() != UNNAMED_TABLE).cloned(),
75 normalize_ident,
76 );
77 if self.options.collect_spans {
78 if let Some(span) = Span::try_from_sqlparser_span(id_span) {
79 column.spans_mut().add_span(span);
80 }
81 }
82 return Ok(Expr::Column(column));
83 }
84
85 if let Some(outer) = planner_context.outer_query_schema() {
87 if let Ok((qualifier, field)) =
88 outer.qualified_field_with_unqualified_name(normalize_ident.as_str())
89 {
90 return Ok(Expr::OuterReferenceColumn(
92 Arc::new(field.clone()),
93 Column::from((qualifier, field)),
94 ));
95 }
96 }
97
98 let mut column = Column::new_unqualified(normalize_ident);
100 if self.options.collect_spans {
101 if let Some(span) = Span::try_from_sqlparser_span(id_span) {
102 column.spans_mut().add_span(span);
103 }
104 }
105 Ok(Expr::Column(column))
106 }
107 }
108
109 pub(crate) fn sql_compound_identifier_to_expr(
110 &self,
111 ids: Vec<Ident>,
112 schema: &DFSchema,
113 planner_context: &mut PlannerContext,
114 ) -> Result<Expr> {
115 if ids.len() < 2 {
116 return internal_err!("Not a compound identifier: {ids:?}");
117 }
118
119 let ids_span = Span::union_iter(
120 ids.iter()
121 .filter_map(|id| Span::try_from_sqlparser_span(id.span)),
122 );
123
124 if ids[0].value.starts_with('@') {
125 let var_names: Vec<_> = ids
126 .into_iter()
127 .map(|id| self.ident_normalizer.normalize(id))
128 .collect();
129 let ty = self
130 .context_provider
131 .get_variable_type(&var_names)
132 .ok_or_else(|| {
133 exec_datafusion_err!("variable {var_names:?} has no type information")
134 })?;
135 Ok(Expr::ScalarVariable(ty, var_names))
136 } else {
137 let ids = ids
138 .into_iter()
139 .map(|id| self.ident_normalizer.normalize(id))
140 .collect::<Vec<_>>();
141
142 let search_result = search_dfschema(&ids, schema);
143 match search_result {
144 Some((field, qualifier, nested_names)) if !nested_names.is_empty() => {
146 for planner in self.context_provider.get_expr_planners() {
148 if let Ok(planner_result) = planner.plan_compound_identifier(
149 field,
150 qualifier,
151 nested_names,
152 ) {
153 match planner_result {
154 PlannerResult::Planned(expr) => {
155 return Ok(expr);
156 }
157 PlannerResult::Original(_args) => {}
158 }
159 }
160 }
161 plan_err!("could not parse compound identifier from {ids:?}")
162 }
163 Some((field, qualifier, _nested_names)) => {
165 let mut column = Column::from((qualifier, field));
166 if self.options.collect_spans {
167 if let Some(span) = ids_span {
168 column.spans_mut().add_span(span);
169 }
170 }
171 Ok(Expr::Column(column))
172 }
173 None => {
174 if ids.len() == 5 {
177 not_impl_err!("compound identifier: {ids:?}")
178 } else {
179 if let Some(outer) = planner_context.outer_query_schema() {
181 let search_result = search_dfschema(&ids, outer);
182 match search_result {
183 Some((field, qualifier, nested_names))
185 if !nested_names.is_empty() =>
186 {
187 not_impl_err!(
189 "Nested identifiers are not yet supported for OuterReferenceColumn {}",
190 Column::from((qualifier, field)).quoted_flat_name()
191 )
192 }
193 Some((field, qualifier, _nested_names)) => {
195 Ok(Expr::OuterReferenceColumn(
197 Arc::new(field.clone()),
198 Column::from((qualifier, field)),
199 ))
200 }
201 None => {
203 let s = &ids[0..ids.len()];
204 let (relation, column_name) =
206 form_identifier(s).unwrap();
207 Ok(Expr::Column(Column::new(relation, column_name)))
208 }
209 }
210 } else {
211 let s = &ids[0..ids.len()];
212 let (relation, column_name) = form_identifier(s).unwrap();
214 let mut column = Column::new(relation, column_name);
215 if self.options.collect_spans {
216 if let Some(span) = ids_span {
217 column.spans_mut().add_span(span);
218 }
219 }
220 Ok(Expr::Column(column))
221 }
222 }
223 }
224 }
225 }
226 }
227
228 pub(super) fn sql_case_identifier_to_expr(
229 &self,
230 operand: Option<Box<SQLExpr>>,
231 conditions: Vec<CaseWhen>,
232 else_result: Option<Box<SQLExpr>>,
233 schema: &DFSchema,
234 planner_context: &mut PlannerContext,
235 ) -> Result<Expr> {
236 let expr = if let Some(e) = operand {
237 Some(Box::new(self.sql_expr_to_logical_expr(
238 *e,
239 schema,
240 planner_context,
241 )?))
242 } else {
243 None
244 };
245 let when_then_expr = conditions
246 .into_iter()
247 .map(|e| {
248 Ok((
249 Box::new(self.sql_expr_to_logical_expr(
250 e.condition,
251 schema,
252 planner_context,
253 )?),
254 Box::new(self.sql_expr_to_logical_expr(
255 e.result,
256 schema,
257 planner_context,
258 )?),
259 ))
260 })
261 .collect::<Result<Vec<_>>>()?;
262 let else_expr = if let Some(e) = else_result {
263 Some(Box::new(self.sql_expr_to_logical_expr(
264 *e,
265 schema,
266 planner_context,
267 )?))
268 } else {
269 None
270 };
271
272 Ok(Expr::Case(Case::new(expr, when_then_expr, else_expr)))
273 }
274}
275
276fn form_identifier(idents: &[String]) -> Result<(Option<TableReference>, &String)> {
278 match idents.len() {
279 1 => Ok((None, &idents[0])),
280 2 => Ok((
281 Some(TableReference::Bare {
282 table: idents[0].clone().into(),
283 }),
284 &idents[1],
285 )),
286 3 => Ok((
287 Some(TableReference::Partial {
288 schema: idents[0].clone().into(),
289 table: idents[1].clone().into(),
290 }),
291 &idents[2],
292 )),
293 4 => Ok((
294 Some(TableReference::Full {
295 catalog: idents[0].clone().into(),
296 schema: idents[1].clone().into(),
297 table: idents[2].clone().into(),
298 }),
299 &idents[3],
300 )),
301 _ => internal_err!("Incorrect number of identifiers: {}", idents.len()),
302 }
303}
304
305fn search_dfschema<'ids, 'schema>(
306 ids: &'ids [String],
307 schema: &'schema DFSchema,
308) -> Option<(
309 &'schema Field,
310 Option<&'schema TableReference>,
311 &'ids [String],
312)> {
313 generate_schema_search_terms(ids).find_map(|(qualifier, column, nested_names)| {
314 let qualifier_and_field = schema
315 .qualified_field_with_name(qualifier.as_ref(), column)
316 .ok();
317 qualifier_and_field.map(|(qualifier, field)| (field, qualifier, nested_names))
318 })
319}
320
321fn generate_schema_search_terms(
350 ids: &[String],
351) -> impl Iterator<Item = (Option<TableReference>, &String, &[String])> {
352 let bound = ids.len().min(4);
356 (0..bound).rev().map(|i| {
358 let nested_names_index = i + 1;
359 let qualifier_and_column = &ids[0..nested_names_index];
360 let (relation, column_name) = form_identifier(qualifier_and_column).unwrap();
362 (relation, column_name, &ids[nested_names_index..])
363 })
364}
365
366#[cfg(test)]
367mod test {
368 use super::*;
369
370 #[test]
371 fn test_generate_schema_search_terms() -> Result<()> {
374 type ExpectedItem = (
375 Option<TableReference>,
376 &'static str,
377 &'static [&'static str],
378 );
379 fn assert_vec_eq(
380 expected: Vec<ExpectedItem>,
381 actual: Vec<(Option<TableReference>, &String, &[String])>,
382 ) {
383 for (expected, actual) in expected.into_iter().zip(actual) {
384 assert_eq!(expected.0, actual.0, "qualifier");
385 assert_eq!(expected.1, actual.1, "column name");
386 assert_eq!(expected.2, actual.2, "nested names");
387 }
388 }
389
390 let actual = generate_schema_search_terms(&[]).collect::<Vec<_>>();
391 assert!(actual.is_empty());
392
393 let ids = vec!["a".to_string()];
394 let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
395 let expected: Vec<ExpectedItem> = vec![(None, "a", &[])];
396 assert_vec_eq(expected, actual);
397
398 let ids = vec!["a".to_string(), "b".to_string()];
399 let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
400 let expected: Vec<ExpectedItem> = vec![
401 (Some(TableReference::bare("a")), "b", &[]),
402 (None, "a", &["b"]),
403 ];
404 assert_vec_eq(expected, actual);
405
406 let ids = vec!["a".to_string(), "b".to_string(), "c".to_string()];
407 let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
408 let expected: Vec<ExpectedItem> = vec![
409 (Some(TableReference::partial("a", "b")), "c", &[]),
410 (Some(TableReference::bare("a")), "b", &["c"]),
411 (None, "a", &["b", "c"]),
412 ];
413 assert_vec_eq(expected, actual);
414
415 let ids = vec![
416 "a".to_string(),
417 "b".to_string(),
418 "c".to_string(),
419 "d".to_string(),
420 ];
421 let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
422 let expected: Vec<ExpectedItem> = vec![
423 (Some(TableReference::full("a", "b", "c")), "d", &[]),
424 (Some(TableReference::partial("a", "b")), "c", &["d"]),
425 (Some(TableReference::bare("a")), "b", &["c", "d"]),
426 (None, "a", &["b", "c", "d"]),
427 ];
428 assert_vec_eq(expected, actual);
429
430 let ids = vec![
431 "a".to_string(),
432 "b".to_string(),
433 "c".to_string(),
434 "d".to_string(),
435 "e".to_string(),
436 ];
437 let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
438 let expected: Vec<ExpectedItem> = vec![
439 (Some(TableReference::full("a", "b", "c")), "d", &["e"]),
440 (Some(TableReference::partial("a", "b")), "c", &["d", "e"]),
441 (Some(TableReference::bare("a")), "b", &["c", "d", "e"]),
442 (None, "a", &["b", "c", "d", "e"]),
443 ];
444 assert_vec_eq(expected, actual);
445
446 let ids = vec![
447 "a".to_string(),
448 "b".to_string(),
449 "c".to_string(),
450 "d".to_string(),
451 "e".to_string(),
452 "f".to_string(),
453 ];
454 let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
455 let expected: Vec<ExpectedItem> = vec![
456 (Some(TableReference::full("a", "b", "c")), "d", &["e", "f"]),
457 (
458 Some(TableReference::partial("a", "b")),
459 "c",
460 &["d", "e", "f"],
461 ),
462 (Some(TableReference::bare("a")), "b", &["c", "d", "e", "f"]),
463 (None, "a", &["b", "c", "d", "e", "f"]),
464 ];
465 assert_vec_eq(expected, actual);
466
467 Ok(())
468 }
469
470 #[test]
471 fn test_form_identifier() -> Result<()> {
472 let err = form_identifier(&[]).expect_err("empty identifiers didn't fail");
473 let expected = "Internal error: Incorrect number of identifiers: 0.\n\
474 This issue was likely caused by a bug in DataFusion's code. Please help us to resolve this \
475 by filing a bug report in our issue tracker: https://github.com/apache/datafusion/issues";
476 assert!(expected.starts_with(&err.strip_backtrace()));
477
478 let ids = vec!["a".to_string()];
479 let (qualifier, column) = form_identifier(&ids)?;
480 assert_eq!(qualifier, None);
481 assert_eq!(column, "a");
482
483 let ids = vec!["a".to_string(), "b".to_string()];
484 let (qualifier, column) = form_identifier(&ids)?;
485 assert_eq!(qualifier, Some(TableReference::bare("a")));
486 assert_eq!(column, "b");
487
488 let ids = vec!["a".to_string(), "b".to_string(), "c".to_string()];
489 let (qualifier, column) = form_identifier(&ids)?;
490 assert_eq!(qualifier, Some(TableReference::partial("a", "b")));
491 assert_eq!(column, "c");
492
493 let ids = vec![
494 "a".to_string(),
495 "b".to_string(),
496 "c".to_string(),
497 "d".to_string(),
498 ];
499 let (qualifier, column) = form_identifier(&ids)?;
500 assert_eq!(qualifier, Some(TableReference::full("a", "b", "c")));
501 assert_eq!(column, "d");
502
503 let err = form_identifier(&[
504 "a".to_string(),
505 "b".to_string(),
506 "c".to_string(),
507 "d".to_string(),
508 "e".to_string(),
509 ])
510 .expect_err("too many identifiers didn't fail");
511 let expected = "Internal error: Incorrect number of identifiers: 5.\n\
512 This issue was likely caused by a bug in DataFusion's code. Please help us to resolve this \
513 by filing a bug report in our issue tracker: https://github.com/apache/datafusion/issues";
514 assert!(expected.starts_with(&err.strip_backtrace()));
515
516 Ok(())
517 }
518}