datafusion_common/
column.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Column
19
20use crate::error::{_schema_err, add_possible_columns_to_diag};
21use crate::utils::parse_identifiers_normalized;
22use crate::utils::quote_identifier;
23use crate::{DFSchema, Diagnostic, Result, SchemaError, Spans, TableReference};
24use arrow::datatypes::{Field, FieldRef};
25use std::borrow::Borrow;
26use std::collections::HashSet;
27use std::fmt;
28
29/// A named reference to a qualified field in a schema.
30#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
31pub struct Column {
32    /// relation/table reference.
33    pub relation: Option<TableReference>,
34    /// field/column name.
35    pub name: String,
36    /// Original source code location, if known
37    pub spans: Spans,
38}
39
40impl fmt::Debug for Column {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        f.debug_struct("Column")
43            .field("relation", &self.relation)
44            .field("name", &self.name)
45            .finish()
46    }
47}
48
49impl Column {
50    /// Create Column from optional qualifier and name. The optional qualifier, if present,
51    /// will be parsed and normalized by default.
52    ///
53    /// See full details on [`TableReference::parse_str`]
54    ///
55    /// [`TableReference::parse_str`]: crate::TableReference::parse_str
56    pub fn new(
57        relation: Option<impl Into<TableReference>>,
58        name: impl Into<String>,
59    ) -> Self {
60        Self {
61            relation: relation.map(|r| r.into()),
62            name: name.into(),
63            spans: Spans::new(),
64        }
65    }
66
67    /// Convenience method for when there is no qualifier
68    pub fn new_unqualified(name: impl Into<String>) -> Self {
69        Self {
70            relation: None,
71            name: name.into(),
72            spans: Spans::new(),
73        }
74    }
75
76    /// Create Column from unqualified name.
77    ///
78    /// Alias for `Column::new_unqualified`
79    pub fn from_name(name: impl Into<String>) -> Self {
80        Self {
81            relation: None,
82            name: name.into(),
83            spans: Spans::new(),
84        }
85    }
86
87    /// Create a Column from multiple normalized identifiers
88    ///
89    /// For example, `foo.bar` would be represented as a two element vector
90    /// `["foo", "bar"]`
91    fn from_idents(mut idents: Vec<String>) -> Option<Self> {
92        let (relation, name) = match idents.len() {
93            1 => (None, idents.remove(0)),
94            2 => (
95                Some(TableReference::Bare {
96                    table: idents.remove(0).into(),
97                }),
98                idents.remove(0),
99            ),
100            3 => (
101                Some(TableReference::Partial {
102                    schema: idents.remove(0).into(),
103                    table: idents.remove(0).into(),
104                }),
105                idents.remove(0),
106            ),
107            4 => (
108                Some(TableReference::Full {
109                    catalog: idents.remove(0).into(),
110                    schema: idents.remove(0).into(),
111                    table: idents.remove(0).into(),
112                }),
113                idents.remove(0),
114            ),
115            // any expression that failed to parse or has more than 4 period delimited
116            // identifiers will be treated as an unqualified column name
117            _ => return None,
118        };
119        Some(Self {
120            relation,
121            name,
122            spans: Spans::new(),
123        })
124    }
125
126    /// Deserialize a fully qualified name string into a column
127    ///
128    /// Treats the name as a SQL identifier. For example
129    /// `foo.BAR` would be parsed to a reference to relation `foo`, column name `bar` (lower case)
130    /// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR`
131    pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
132        let flat_name = flat_name.into();
133        Self::from_idents(parse_identifiers_normalized(&flat_name, false)).unwrap_or_else(
134            || Self {
135                relation: None,
136                name: flat_name,
137                spans: Spans::new(),
138            },
139        )
140    }
141
142    /// Deserialize a fully qualified name string into a column preserving column text case
143    #[cfg(feature = "sql")]
144    pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
145        let flat_name = flat_name.into();
146        Self::from_idents(parse_identifiers_normalized(&flat_name, true)).unwrap_or_else(
147            || Self {
148                relation: None,
149                name: flat_name,
150                spans: Spans::new(),
151            },
152        )
153    }
154
155    #[cfg(not(feature = "sql"))]
156    pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
157        Self::from_qualified_name(flat_name)
158    }
159
160    /// return the column's name.
161    ///
162    /// Note: This ignores the relation and returns the column name only.
163    pub fn name(&self) -> &str {
164        &self.name
165    }
166
167    /// Serialize column into a flat name string
168    pub fn flat_name(&self) -> String {
169        match &self.relation {
170            Some(r) => format!("{}.{}", r, self.name),
171            None => self.name.clone(),
172        }
173    }
174
175    /// Serialize column into a quoted flat name string
176    pub fn quoted_flat_name(&self) -> String {
177        match &self.relation {
178            Some(r) => {
179                format!(
180                    "{}.{}",
181                    r.to_quoted_string(),
182                    quote_identifier(self.name.as_str())
183                )
184            }
185            None => quote_identifier(&self.name).to_string(),
186        }
187    }
188
189    /// Qualify column if not done yet.
190    ///
191    /// If this column already has a [relation](Self::relation), it will be returned as is and the given parameters are
192    /// ignored. Otherwise this will search through the given schemas to find the column.
193    ///
194    /// Will check for ambiguity at each level of `schemas`.
195    ///
196    /// A schema matches if there is a single column that -- when unqualified -- matches this column. There is an
197    /// exception for `USING` statements, see below.
198    ///
199    /// # Using columns
200    /// Take the following SQL statement:
201    ///
202    /// ```sql
203    /// SELECT id FROM t1 JOIN t2 USING(id)
204    /// ```
205    ///
206    /// In this case, both `t1.id` and `t2.id` will match unqualified column `id`. To express this possibility, use
207    /// `using_columns`. Each entry in this array is a set of columns that are bound together via a `USING` clause. So
208    /// in this example this would be `[{t1.id, t2.id}]`.
209    ///
210    /// Regarding ambiguity check, `schemas` is structured to allow levels of schemas to be passed in.
211    /// For example:
212    ///
213    /// ```text
214    /// schemas = &[
215    ///    &[schema1, schema2], // first level
216    ///    &[schema3, schema4], // second level
217    /// ]
218    /// ```
219    ///
220    /// Will search for a matching field in all schemas in the first level. If a matching field according to above
221    /// mentioned conditions is not found, then will check the next level. If found more than one matching column across
222    /// all schemas in a level, that isn't a USING column, will return an error due to ambiguous column.
223    ///
224    /// If checked all levels and couldn't find field, will return field not found error.
225    pub fn normalize_with_schemas_and_ambiguity_check(
226        self,
227        schemas: &[&[&DFSchema]],
228        using_columns: &[HashSet<Column>],
229    ) -> Result<Self> {
230        if self.relation.is_some() {
231            return Ok(self);
232        }
233
234        for schema_level in schemas {
235            let qualified_fields = schema_level
236                .iter()
237                .flat_map(|s| s.qualified_fields_with_unqualified_name(&self.name))
238                .collect::<Vec<_>>();
239            match qualified_fields.len() {
240                0 => continue,
241                1 => return Ok(Column::from(qualified_fields[0])),
242                _ => {
243                    // More than 1 fields in this schema have their names set to self.name.
244                    //
245                    // This should only happen when a JOIN query with USING constraint references
246                    // join columns using unqualified column name. For example:
247                    //
248                    // ```sql
249                    // SELECT id FROM t1 JOIN t2 USING(id)
250                    // ```
251                    //
252                    // In this case, both `t1.id` and `t2.id` will match unqualified column `id`.
253                    // We will use the relation from the first matched field to normalize self.
254
255                    // Compare matched fields with one USING JOIN clause at a time
256                    let columns = schema_level
257                        .iter()
258                        .flat_map(|s| s.columns_with_unqualified_name(&self.name))
259                        .collect::<Vec<_>>();
260                    for using_col in using_columns {
261                        let all_matched = columns.iter().all(|c| using_col.contains(c));
262                        // All matched fields belong to the same using column set, in other words
263                        // the same join clause. We simply pick the qualifier from the first match.
264                        if all_matched {
265                            return Ok(columns[0].clone());
266                        }
267                    }
268
269                    // If not due to USING columns then due to ambiguous column name
270                    return _schema_err!(SchemaError::AmbiguousReference {
271                        field: Box::new(Column::new_unqualified(&self.name)),
272                    })
273                    .map_err(|err| {
274                        let mut diagnostic = Diagnostic::new_error(
275                            format!("column '{}' is ambiguous", &self.name),
276                            self.spans().first(),
277                        );
278                        // TODO If [`DFSchema`] had spans, we could show the
279                        // user which columns are candidates, or which table
280                        // they come from. For now, let's list the table names
281                        // only.
282                        add_possible_columns_to_diag(
283                            &mut diagnostic,
284                            &Column::new_unqualified(&self.name),
285                            &columns,
286                        );
287                        err.with_diagnostic(diagnostic)
288                    });
289                }
290            }
291        }
292
293        _schema_err!(SchemaError::FieldNotFound {
294            field: Box::new(self),
295            valid_fields: schemas
296                .iter()
297                .flat_map(|s| s.iter())
298                .flat_map(|s| s.columns())
299                .collect(),
300        })
301    }
302
303    /// Returns a reference to the set of locations in the SQL query where this
304    /// column appears, if known.
305    pub fn spans(&self) -> &Spans {
306        &self.spans
307    }
308
309    /// Returns a mutable reference to the set of locations in the SQL query
310    /// where this column appears, if known.
311    pub fn spans_mut(&mut self) -> &mut Spans {
312        &mut self.spans
313    }
314
315    /// Replaces the set of locations in the SQL query where this column
316    /// appears, if known.
317    pub fn with_spans(mut self, spans: Spans) -> Self {
318        self.spans = spans;
319        self
320    }
321
322    /// Qualifies the column with the given table reference.
323    pub fn with_relation(&self, relation: TableReference) -> Self {
324        Self {
325            relation: Some(relation),
326            ..self.clone()
327        }
328    }
329
330    pub fn is_lambda_parameter(&self, lambdas_params: &crate::HashSet<impl Borrow<str> + Eq + std::hash::Hash>) -> bool {
331        // currently, references to lambda parameters are always unqualified
332        self.relation.is_none() && lambdas_params.contains(self.name())
333    }
334}
335
336impl From<&str> for Column {
337    fn from(c: &str) -> Self {
338        Self::from_qualified_name(c)
339    }
340}
341
342/// Create a column, cloning the string
343impl From<&String> for Column {
344    fn from(c: &String) -> Self {
345        Self::from_qualified_name(c)
346    }
347}
348
349/// Create a column, reusing the existing string
350impl From<String> for Column {
351    fn from(c: String) -> Self {
352        Self::from_qualified_name(c)
353    }
354}
355
356/// Create a column, use qualifier and field name
357impl From<(Option<&TableReference>, &Field)> for Column {
358    fn from((relation, field): (Option<&TableReference>, &Field)) -> Self {
359        Self::new(relation.cloned(), field.name())
360    }
361}
362
363/// Create a column, use qualifier and field name
364impl From<(Option<&TableReference>, &FieldRef)> for Column {
365    fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
366        Self::new(relation.cloned(), field.name())
367    }
368}
369
370#[cfg(feature = "sql")]
371impl std::str::FromStr for Column {
372    type Err = std::convert::Infallible;
373
374    fn from_str(s: &str) -> Result<Self, Self::Err> {
375        Ok(s.into())
376    }
377}
378
379impl fmt::Display for Column {
380    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
381        write!(f, "{}", self.flat_name())
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388    use arrow::datatypes::{DataType, SchemaBuilder};
389    use std::sync::Arc;
390
391    fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> {
392        let mut schema_builder = SchemaBuilder::new();
393        schema_builder.extend(
394            names
395                .iter()
396                .map(|f| Field::new(*f, DataType::Boolean, true)),
397        );
398        let schema = Arc::new(schema_builder.finish());
399        DFSchema::try_from_qualified_schema(qualifier, &schema)
400    }
401
402    #[test]
403    fn test_normalize_with_schemas_and_ambiguity_check() -> Result<()> {
404        let schema1 = create_qualified_schema("t1", vec!["a", "b"])?;
405        let schema2 = create_qualified_schema("t2", vec!["c", "d"])?;
406        let schema3 = create_qualified_schema("t3", vec!["a", "b", "c", "d", "e"])?;
407
408        // already normalized
409        let col = Column::new(Some("t1"), "a");
410        let col = col.normalize_with_schemas_and_ambiguity_check(&[], &[])?;
411        assert_eq!(col, Column::new(Some("t1"), "a"));
412
413        // should find in first level (schema1)
414        let col = Column::from_name("a");
415        let col = col.normalize_with_schemas_and_ambiguity_check(
416            &[&[&schema1, &schema2], &[&schema3]],
417            &[],
418        )?;
419        assert_eq!(col, Column::new(Some("t1"), "a"));
420
421        // should find in second level (schema3)
422        let col = Column::from_name("e");
423        let col = col.normalize_with_schemas_and_ambiguity_check(
424            &[&[&schema1, &schema2], &[&schema3]],
425            &[],
426        )?;
427        assert_eq!(col, Column::new(Some("t3"), "e"));
428
429        // using column in first level (pick schema1)
430        let mut using_columns = HashSet::new();
431        using_columns.insert(Column::new(Some("t1"), "a"));
432        using_columns.insert(Column::new(Some("t3"), "a"));
433        let col = Column::from_name("a");
434        let col = col.normalize_with_schemas_and_ambiguity_check(
435            &[&[&schema1, &schema3], &[&schema2]],
436            &[using_columns],
437        )?;
438        assert_eq!(col, Column::new(Some("t1"), "a"));
439
440        // not found in any level
441        let col = Column::from_name("z");
442        let err = col
443            .normalize_with_schemas_and_ambiguity_check(
444                &[&[&schema1, &schema2], &[&schema3]],
445                &[],
446            )
447            .expect_err("should've failed to find field");
448        let expected = "Schema error: No field named z. \
449            Valid fields are t1.a, t1.b, t2.c, t2.d, t3.a, t3.b, t3.c, t3.d, t3.e.";
450        assert_eq!(err.strip_backtrace(), expected);
451
452        // ambiguous column reference
453        let col = Column::from_name("a");
454        let err = col
455            .normalize_with_schemas_and_ambiguity_check(
456                &[&[&schema1, &schema3], &[&schema2]],
457                &[],
458            )
459            .expect_err("should've found ambiguous field");
460        let expected = "Schema error: Ambiguous reference to unqualified field a";
461        assert_eq!(err.strip_backtrace(), expected);
462
463        Ok(())
464    }
465}