datafusion_sql/expr/
identifier.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::datatypes::Field;
19use datafusion_common::{
20    exec_datafusion_err, internal_err, not_impl_err, plan_datafusion_err, plan_err,
21    Column, DFSchema, Result, Span, TableReference,
22};
23use datafusion_expr::planner::PlannerResult;
24use datafusion_expr::{Case, Expr};
25use sqlparser::ast::{CaseWhen, Expr as SQLExpr, Ident};
26use std::sync::Arc;
27
28use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
29use datafusion_expr::UNNAMED_TABLE;
30
31impl<S: ContextProvider> SqlToRel<'_, S> {
32    pub(super) fn sql_identifier_to_expr(
33        &self,
34        id: Ident,
35        schema: &DFSchema,
36        planner_context: &mut PlannerContext,
37    ) -> Result<Expr> {
38        let id_span = id.span;
39        if id.value.starts_with('@') {
40            // TODO: figure out if ScalarVariables should be insensitive.
41            let var_names = vec![id.value];
42            let ty = self
43                .context_provider
44                .get_variable_type(&var_names)
45                .ok_or_else(|| {
46                    plan_datafusion_err!("variable {var_names:?} has no type information")
47                })?;
48            Ok(Expr::ScalarVariable(ty, var_names))
49        } else {
50            // Don't use `col()` here because it will try to
51            // interpret names with '.' as if they were
52            // compound identifiers, but this is not a compound
53            // identifier. (e.g. it is "foo.bar" not foo.bar)
54            let normalize_ident = self.ident_normalizer.normalize(id);
55
56            if planner_context
57                .lambdas_parameters()
58                .contains(&normalize_ident)
59            {
60                let mut column = Column::new_unqualified(normalize_ident);
61                if self.options.collect_spans {
62                    if let Some(span) = Span::try_from_sqlparser_span(id_span) {
63                        column.spans_mut().add_span(span);
64                    }
65                }
66                return Ok(Expr::Column(column));
67            }
68
69            // Check for qualified field with unqualified name
70            if let Ok((qualifier, _)) =
71                schema.qualified_field_with_unqualified_name(normalize_ident.as_str())
72            {
73                let mut column = Column::new(
74                    qualifier.filter(|q| q.table() != UNNAMED_TABLE).cloned(),
75                    normalize_ident,
76                );
77                if self.options.collect_spans {
78                    if let Some(span) = Span::try_from_sqlparser_span(id_span) {
79                        column.spans_mut().add_span(span);
80                    }
81                }
82                return Ok(Expr::Column(column));
83            }
84
85            // Check the outer query schema
86            if let Some(outer) = planner_context.outer_query_schema() {
87                if let Ok((qualifier, field)) =
88                    outer.qualified_field_with_unqualified_name(normalize_ident.as_str())
89                {
90                    // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
91                    return Ok(Expr::OuterReferenceColumn(
92                        Arc::new(field.clone()),
93                        Column::from((qualifier, field)),
94                    ));
95                }
96            }
97
98            // Default case
99            let mut column = Column::new_unqualified(normalize_ident);
100            if self.options.collect_spans {
101                if let Some(span) = Span::try_from_sqlparser_span(id_span) {
102                    column.spans_mut().add_span(span);
103                }
104            }
105            Ok(Expr::Column(column))
106        }
107    }
108
109    pub(crate) fn sql_compound_identifier_to_expr(
110        &self,
111        ids: Vec<Ident>,
112        schema: &DFSchema,
113        planner_context: &mut PlannerContext,
114    ) -> Result<Expr> {
115        if ids.len() < 2 {
116            return internal_err!("Not a compound identifier: {ids:?}");
117        }
118
119        let ids_span = Span::union_iter(
120            ids.iter()
121                .filter_map(|id| Span::try_from_sqlparser_span(id.span)),
122        );
123
124        if ids[0].value.starts_with('@') {
125            let var_names: Vec<_> = ids
126                .into_iter()
127                .map(|id| self.ident_normalizer.normalize(id))
128                .collect();
129            let ty = self
130                .context_provider
131                .get_variable_type(&var_names)
132                .ok_or_else(|| {
133                    exec_datafusion_err!("variable {var_names:?} has no type information")
134                })?;
135            Ok(Expr::ScalarVariable(ty, var_names))
136        } else {
137            let ids = ids
138                .into_iter()
139                .map(|id| self.ident_normalizer.normalize(id))
140                .collect::<Vec<_>>();
141
142            let search_result = search_dfschema(&ids, schema);
143            match search_result {
144                // Found matching field with spare identifier(s) for nested field(s) in structure
145                Some((field, qualifier, nested_names)) if !nested_names.is_empty() => {
146                    // Found matching field with spare identifier(s) for nested field(s) in structure
147                    for planner in self.context_provider.get_expr_planners() {
148                        if let Ok(planner_result) = planner.plan_compound_identifier(
149                            field,
150                            qualifier,
151                            nested_names,
152                        ) {
153                            match planner_result {
154                                PlannerResult::Planned(expr) => {
155                                    return Ok(expr);
156                                }
157                                PlannerResult::Original(_args) => {}
158                            }
159                        }
160                    }
161                    plan_err!("could not parse compound identifier from {ids:?}")
162                }
163                // Found matching field with no spare identifier(s)
164                Some((field, qualifier, _nested_names)) => {
165                    let mut column = Column::from((qualifier, field));
166                    if self.options.collect_spans {
167                        if let Some(span) = ids_span {
168                            column.spans_mut().add_span(span);
169                        }
170                    }
171                    Ok(Expr::Column(column))
172                }
173                None => {
174                    // Return default where use all identifiers to not have a nested field
175                    // this len check is because at 5 identifiers will have to have a nested field
176                    if ids.len() == 5 {
177                        not_impl_err!("compound identifier: {ids:?}")
178                    } else {
179                        // Check the outer_query_schema and try to find a match
180                        if let Some(outer) = planner_context.outer_query_schema() {
181                            let search_result = search_dfschema(&ids, outer);
182                            match search_result {
183                                // Found matching field with spare identifier(s) for nested field(s) in structure
184                                Some((field, qualifier, nested_names))
185                                    if !nested_names.is_empty() =>
186                                {
187                                    // TODO: remove when can support nested identifiers for OuterReferenceColumn
188                                    not_impl_err!(
189                                        "Nested identifiers are not yet supported for OuterReferenceColumn {}",
190                                        Column::from((qualifier, field)).quoted_flat_name()
191                                    )
192                                }
193                                // Found matching field with no spare identifier(s)
194                                Some((field, qualifier, _nested_names)) => {
195                                    // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
196                                    Ok(Expr::OuterReferenceColumn(
197                                        Arc::new(field.clone()),
198                                        Column::from((qualifier, field)),
199                                    ))
200                                }
201                                // Found no matching field, will return a default
202                                None => {
203                                    let s = &ids[0..ids.len()];
204                                    // safe unwrap as s can never be empty or exceed the bounds
205                                    let (relation, column_name) =
206                                        form_identifier(s).unwrap();
207                                    Ok(Expr::Column(Column::new(relation, column_name)))
208                                }
209                            }
210                        } else {
211                            let s = &ids[0..ids.len()];
212                            // Safe unwrap as s can never be empty or exceed the bounds
213                            let (relation, column_name) = form_identifier(s).unwrap();
214                            let mut column = Column::new(relation, column_name);
215                            if self.options.collect_spans {
216                                if let Some(span) = ids_span {
217                                    column.spans_mut().add_span(span);
218                                }
219                            }
220                            Ok(Expr::Column(column))
221                        }
222                    }
223                }
224            }
225        }
226    }
227
228    pub(super) fn sql_case_identifier_to_expr(
229        &self,
230        operand: Option<Box<SQLExpr>>,
231        conditions: Vec<CaseWhen>,
232        else_result: Option<Box<SQLExpr>>,
233        schema: &DFSchema,
234        planner_context: &mut PlannerContext,
235    ) -> Result<Expr> {
236        let expr = if let Some(e) = operand {
237            Some(Box::new(self.sql_expr_to_logical_expr(
238                *e,
239                schema,
240                planner_context,
241            )?))
242        } else {
243            None
244        };
245        let when_then_expr = conditions
246            .into_iter()
247            .map(|e| {
248                Ok((
249                    Box::new(self.sql_expr_to_logical_expr(
250                        e.condition,
251                        schema,
252                        planner_context,
253                    )?),
254                    Box::new(self.sql_expr_to_logical_expr(
255                        e.result,
256                        schema,
257                        planner_context,
258                    )?),
259                ))
260            })
261            .collect::<Result<Vec<_>>>()?;
262        let else_expr = if let Some(e) = else_result {
263            Some(Box::new(self.sql_expr_to_logical_expr(
264                *e,
265                schema,
266                planner_context,
267            )?))
268        } else {
269            None
270        };
271
272        Ok(Expr::Case(Case::new(expr, when_then_expr, else_expr)))
273    }
274}
275
276// (relation, column name)
277fn form_identifier(idents: &[String]) -> Result<(Option<TableReference>, &String)> {
278    match idents.len() {
279        1 => Ok((None, &idents[0])),
280        2 => Ok((
281            Some(TableReference::Bare {
282                table: idents[0].clone().into(),
283            }),
284            &idents[1],
285        )),
286        3 => Ok((
287            Some(TableReference::Partial {
288                schema: idents[0].clone().into(),
289                table: idents[1].clone().into(),
290            }),
291            &idents[2],
292        )),
293        4 => Ok((
294            Some(TableReference::Full {
295                catalog: idents[0].clone().into(),
296                schema: idents[1].clone().into(),
297                table: idents[2].clone().into(),
298            }),
299            &idents[3],
300        )),
301        _ => internal_err!("Incorrect number of identifiers: {}", idents.len()),
302    }
303}
304
305fn search_dfschema<'ids, 'schema>(
306    ids: &'ids [String],
307    schema: &'schema DFSchema,
308) -> Option<(
309    &'schema Field,
310    Option<&'schema TableReference>,
311    &'ids [String],
312)> {
313    generate_schema_search_terms(ids).find_map(|(qualifier, column, nested_names)| {
314        let qualifier_and_field = schema
315            .qualified_field_with_name(qualifier.as_ref(), column)
316            .ok();
317        qualifier_and_field.map(|(qualifier, field)| (field, qualifier, nested_names))
318    })
319}
320
321// Possibilities we search with, in order from top to bottom for each len:
322//
323// len = 2:
324// 1. (table.column)
325// 2. (column).nested
326//
327// len = 3:
328// 1. (schema.table.column)
329// 2. (table.column).nested
330// 3. (column).nested1.nested2
331//
332// len = 4:
333// 1. (catalog.schema.table.column)
334// 2. (schema.table.column).nested1
335// 3. (table.column).nested1.nested2
336// 4. (column).nested1.nested2.nested3
337//
338// len = 5:
339// 1. (catalog.schema.table.column).nested
340// 2. (schema.table.column).nested1.nested2
341// 3. (table.column).nested1.nested2.nested3
342// 4. (column).nested1.nested2.nested3.nested4
343//
344// len > 5:
345// 1. (catalog.schema.table.column).nested[.nestedN]+
346// 2. (schema.table.column).nested1.nested2[.nestedN]+
347// 3. (table.column).nested1.nested2.nested3[.nestedN]+
348// 4. (column).nested1.nested2.nested3.nested4[.nestedN]+
349fn generate_schema_search_terms(
350    ids: &[String],
351) -> impl Iterator<Item = (Option<TableReference>, &String, &[String])> {
352    // Take at most 4 identifiers to form a Column to search with
353    // - 1 for the column name
354    // - 0 to 3 for the TableReference
355    let bound = ids.len().min(4);
356    // Search terms from most specific to least specific
357    (0..bound).rev().map(|i| {
358        let nested_names_index = i + 1;
359        let qualifier_and_column = &ids[0..nested_names_index];
360        // Safe unwrap as qualifier_and_column can never be empty or exceed the bounds
361        let (relation, column_name) = form_identifier(qualifier_and_column).unwrap();
362        (relation, column_name, &ids[nested_names_index..])
363    })
364}
365
366#[cfg(test)]
367mod test {
368    use super::*;
369
370    #[test]
371    // testing according to documentation of generate_schema_search_terms function
372    // where it ensures generated search terms are in correct order with correct values
373    fn test_generate_schema_search_terms() -> Result<()> {
374        type ExpectedItem = (
375            Option<TableReference>,
376            &'static str,
377            &'static [&'static str],
378        );
379        fn assert_vec_eq(
380            expected: Vec<ExpectedItem>,
381            actual: Vec<(Option<TableReference>, &String, &[String])>,
382        ) {
383            for (expected, actual) in expected.into_iter().zip(actual) {
384                assert_eq!(expected.0, actual.0, "qualifier");
385                assert_eq!(expected.1, actual.1, "column name");
386                assert_eq!(expected.2, actual.2, "nested names");
387            }
388        }
389
390        let actual = generate_schema_search_terms(&[]).collect::<Vec<_>>();
391        assert!(actual.is_empty());
392
393        let ids = vec!["a".to_string()];
394        let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
395        let expected: Vec<ExpectedItem> = vec![(None, "a", &[])];
396        assert_vec_eq(expected, actual);
397
398        let ids = vec!["a".to_string(), "b".to_string()];
399        let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
400        let expected: Vec<ExpectedItem> = vec![
401            (Some(TableReference::bare("a")), "b", &[]),
402            (None, "a", &["b"]),
403        ];
404        assert_vec_eq(expected, actual);
405
406        let ids = vec!["a".to_string(), "b".to_string(), "c".to_string()];
407        let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
408        let expected: Vec<ExpectedItem> = vec![
409            (Some(TableReference::partial("a", "b")), "c", &[]),
410            (Some(TableReference::bare("a")), "b", &["c"]),
411            (None, "a", &["b", "c"]),
412        ];
413        assert_vec_eq(expected, actual);
414
415        let ids = vec![
416            "a".to_string(),
417            "b".to_string(),
418            "c".to_string(),
419            "d".to_string(),
420        ];
421        let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
422        let expected: Vec<ExpectedItem> = vec![
423            (Some(TableReference::full("a", "b", "c")), "d", &[]),
424            (Some(TableReference::partial("a", "b")), "c", &["d"]),
425            (Some(TableReference::bare("a")), "b", &["c", "d"]),
426            (None, "a", &["b", "c", "d"]),
427        ];
428        assert_vec_eq(expected, actual);
429
430        let ids = vec![
431            "a".to_string(),
432            "b".to_string(),
433            "c".to_string(),
434            "d".to_string(),
435            "e".to_string(),
436        ];
437        let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
438        let expected: Vec<ExpectedItem> = vec![
439            (Some(TableReference::full("a", "b", "c")), "d", &["e"]),
440            (Some(TableReference::partial("a", "b")), "c", &["d", "e"]),
441            (Some(TableReference::bare("a")), "b", &["c", "d", "e"]),
442            (None, "a", &["b", "c", "d", "e"]),
443        ];
444        assert_vec_eq(expected, actual);
445
446        let ids = vec![
447            "a".to_string(),
448            "b".to_string(),
449            "c".to_string(),
450            "d".to_string(),
451            "e".to_string(),
452            "f".to_string(),
453        ];
454        let actual = generate_schema_search_terms(&ids).collect::<Vec<_>>();
455        let expected: Vec<ExpectedItem> = vec![
456            (Some(TableReference::full("a", "b", "c")), "d", &["e", "f"]),
457            (
458                Some(TableReference::partial("a", "b")),
459                "c",
460                &["d", "e", "f"],
461            ),
462            (Some(TableReference::bare("a")), "b", &["c", "d", "e", "f"]),
463            (None, "a", &["b", "c", "d", "e", "f"]),
464        ];
465        assert_vec_eq(expected, actual);
466
467        Ok(())
468    }
469
470    #[test]
471    fn test_form_identifier() -> Result<()> {
472        let err = form_identifier(&[]).expect_err("empty identifiers didn't fail");
473        let expected = "Internal error: Incorrect number of identifiers: 0.\n\
474         This issue was likely caused by a bug in DataFusion's code. Please help us to resolve this \
475         by filing a bug report in our issue tracker: https://github.com/apache/datafusion/issues";
476        assert!(expected.starts_with(&err.strip_backtrace()));
477
478        let ids = vec!["a".to_string()];
479        let (qualifier, column) = form_identifier(&ids)?;
480        assert_eq!(qualifier, None);
481        assert_eq!(column, "a");
482
483        let ids = vec!["a".to_string(), "b".to_string()];
484        let (qualifier, column) = form_identifier(&ids)?;
485        assert_eq!(qualifier, Some(TableReference::bare("a")));
486        assert_eq!(column, "b");
487
488        let ids = vec!["a".to_string(), "b".to_string(), "c".to_string()];
489        let (qualifier, column) = form_identifier(&ids)?;
490        assert_eq!(qualifier, Some(TableReference::partial("a", "b")));
491        assert_eq!(column, "c");
492
493        let ids = vec![
494            "a".to_string(),
495            "b".to_string(),
496            "c".to_string(),
497            "d".to_string(),
498        ];
499        let (qualifier, column) = form_identifier(&ids)?;
500        assert_eq!(qualifier, Some(TableReference::full("a", "b", "c")));
501        assert_eq!(column, "d");
502
503        let err = form_identifier(&[
504            "a".to_string(),
505            "b".to_string(),
506            "c".to_string(),
507            "d".to_string(),
508            "e".to_string(),
509        ])
510        .expect_err("too many identifiers didn't fail");
511        let expected = "Internal error: Incorrect number of identifiers: 5.\n\
512         This issue was likely caused by a bug in DataFusion's code. Please help us to resolve this \
513         by filing a bug report in our issue tracker: https://github.com/apache/datafusion/issues";
514        assert!(expected.starts_with(&err.strip_backtrace()));
515
516        Ok(())
517    }
518}