datafusion_common/column.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Column
19
20use crate::error::{_schema_err, add_possible_columns_to_diag};
21use crate::utils::parse_identifiers_normalized;
22use crate::utils::quote_identifier;
23use crate::{DFSchema, Diagnostic, Result, SchemaError, Spans, TableReference};
24use arrow::datatypes::{Field, FieldRef};
25use std::borrow::Borrow;
26use std::collections::HashSet;
27use std::fmt;
28
29/// A named reference to a qualified field in a schema.
30#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
31pub struct Column {
32 /// relation/table reference.
33 pub relation: Option<TableReference>,
34 /// field/column name.
35 pub name: String,
36 /// Original source code location, if known
37 pub spans: Spans,
38}
39
40impl fmt::Debug for Column {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 f.debug_struct("Column")
43 .field("relation", &self.relation)
44 .field("name", &self.name)
45 .finish()
46 }
47}
48
49impl Column {
50 /// Create Column from optional qualifier and name. The optional qualifier, if present,
51 /// will be parsed and normalized by default.
52 ///
53 /// See full details on [`TableReference::parse_str`]
54 ///
55 /// [`TableReference::parse_str`]: crate::TableReference::parse_str
56 pub fn new(
57 relation: Option<impl Into<TableReference>>,
58 name: impl Into<String>,
59 ) -> Self {
60 Self {
61 relation: relation.map(|r| r.into()),
62 name: name.into(),
63 spans: Spans::new(),
64 }
65 }
66
67 /// Convenience method for when there is no qualifier
68 pub fn new_unqualified(name: impl Into<String>) -> Self {
69 Self {
70 relation: None,
71 name: name.into(),
72 spans: Spans::new(),
73 }
74 }
75
76 /// Create Column from unqualified name.
77 ///
78 /// Alias for `Column::new_unqualified`
79 pub fn from_name(name: impl Into<String>) -> Self {
80 Self {
81 relation: None,
82 name: name.into(),
83 spans: Spans::new(),
84 }
85 }
86
87 /// Create a Column from multiple normalized identifiers
88 ///
89 /// For example, `foo.bar` would be represented as a two element vector
90 /// `["foo", "bar"]`
91 fn from_idents(mut idents: Vec<String>) -> Option<Self> {
92 let (relation, name) = match idents.len() {
93 1 => (None, idents.remove(0)),
94 2 => (
95 Some(TableReference::Bare {
96 table: idents.remove(0).into(),
97 }),
98 idents.remove(0),
99 ),
100 3 => (
101 Some(TableReference::Partial {
102 schema: idents.remove(0).into(),
103 table: idents.remove(0).into(),
104 }),
105 idents.remove(0),
106 ),
107 4 => (
108 Some(TableReference::Full {
109 catalog: idents.remove(0).into(),
110 schema: idents.remove(0).into(),
111 table: idents.remove(0).into(),
112 }),
113 idents.remove(0),
114 ),
115 // any expression that failed to parse or has more than 4 period delimited
116 // identifiers will be treated as an unqualified column name
117 _ => return None,
118 };
119 Some(Self {
120 relation,
121 name,
122 spans: Spans::new(),
123 })
124 }
125
126 /// Deserialize a fully qualified name string into a column
127 ///
128 /// Treats the name as a SQL identifier. For example
129 /// `foo.BAR` would be parsed to a reference to relation `foo`, column name `bar` (lower case)
130 /// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR`
131 pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
132 let flat_name = flat_name.into();
133 Self::from_idents(parse_identifiers_normalized(&flat_name, false)).unwrap_or_else(
134 || Self {
135 relation: None,
136 name: flat_name,
137 spans: Spans::new(),
138 },
139 )
140 }
141
142 /// Deserialize a fully qualified name string into a column preserving column text case
143 #[cfg(feature = "sql")]
144 pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
145 let flat_name = flat_name.into();
146 Self::from_idents(parse_identifiers_normalized(&flat_name, true)).unwrap_or_else(
147 || Self {
148 relation: None,
149 name: flat_name,
150 spans: Spans::new(),
151 },
152 )
153 }
154
155 #[cfg(not(feature = "sql"))]
156 pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
157 Self::from_qualified_name(flat_name)
158 }
159
160 /// return the column's name.
161 ///
162 /// Note: This ignores the relation and returns the column name only.
163 pub fn name(&self) -> &str {
164 &self.name
165 }
166
167 /// Serialize column into a flat name string
168 pub fn flat_name(&self) -> String {
169 match &self.relation {
170 Some(r) => format!("{}.{}", r, self.name),
171 None => self.name.clone(),
172 }
173 }
174
175 /// Serialize column into a quoted flat name string
176 pub fn quoted_flat_name(&self) -> String {
177 match &self.relation {
178 Some(r) => {
179 format!(
180 "{}.{}",
181 r.to_quoted_string(),
182 quote_identifier(self.name.as_str())
183 )
184 }
185 None => quote_identifier(&self.name).to_string(),
186 }
187 }
188
189 /// Qualify column if not done yet.
190 ///
191 /// If this column already has a [relation](Self::relation), it will be returned as is and the given parameters are
192 /// ignored. Otherwise this will search through the given schemas to find the column.
193 ///
194 /// Will check for ambiguity at each level of `schemas`.
195 ///
196 /// A schema matches if there is a single column that -- when unqualified -- matches this column. There is an
197 /// exception for `USING` statements, see below.
198 ///
199 /// # Using columns
200 /// Take the following SQL statement:
201 ///
202 /// ```sql
203 /// SELECT id FROM t1 JOIN t2 USING(id)
204 /// ```
205 ///
206 /// In this case, both `t1.id` and `t2.id` will match unqualified column `id`. To express this possibility, use
207 /// `using_columns`. Each entry in this array is a set of columns that are bound together via a `USING` clause. So
208 /// in this example this would be `[{t1.id, t2.id}]`.
209 ///
210 /// Regarding ambiguity check, `schemas` is structured to allow levels of schemas to be passed in.
211 /// For example:
212 ///
213 /// ```text
214 /// schemas = &[
215 /// &[schema1, schema2], // first level
216 /// &[schema3, schema4], // second level
217 /// ]
218 /// ```
219 ///
220 /// Will search for a matching field in all schemas in the first level. If a matching field according to above
221 /// mentioned conditions is not found, then will check the next level. If found more than one matching column across
222 /// all schemas in a level, that isn't a USING column, will return an error due to ambiguous column.
223 ///
224 /// If checked all levels and couldn't find field, will return field not found error.
225 pub fn normalize_with_schemas_and_ambiguity_check(
226 self,
227 schemas: &[&[&DFSchema]],
228 using_columns: &[HashSet<Column>],
229 ) -> Result<Self> {
230 if self.relation.is_some() {
231 return Ok(self);
232 }
233
234 for schema_level in schemas {
235 let qualified_fields = schema_level
236 .iter()
237 .flat_map(|s| s.qualified_fields_with_unqualified_name(&self.name))
238 .collect::<Vec<_>>();
239 match qualified_fields.len() {
240 0 => continue,
241 1 => return Ok(Column::from(qualified_fields[0])),
242 _ => {
243 // More than 1 fields in this schema have their names set to self.name.
244 //
245 // This should only happen when a JOIN query with USING constraint references
246 // join columns using unqualified column name. For example:
247 //
248 // ```sql
249 // SELECT id FROM t1 JOIN t2 USING(id)
250 // ```
251 //
252 // In this case, both `t1.id` and `t2.id` will match unqualified column `id`.
253 // We will use the relation from the first matched field to normalize self.
254
255 // Compare matched fields with one USING JOIN clause at a time
256 let columns = schema_level
257 .iter()
258 .flat_map(|s| s.columns_with_unqualified_name(&self.name))
259 .collect::<Vec<_>>();
260 for using_col in using_columns {
261 let all_matched = columns.iter().all(|c| using_col.contains(c));
262 // All matched fields belong to the same using column set, in other words
263 // the same join clause. We simply pick the qualifier from the first match.
264 if all_matched {
265 return Ok(columns[0].clone());
266 }
267 }
268
269 // If not due to USING columns then due to ambiguous column name
270 return _schema_err!(SchemaError::AmbiguousReference {
271 field: Box::new(Column::new_unqualified(&self.name)),
272 })
273 .map_err(|err| {
274 let mut diagnostic = Diagnostic::new_error(
275 format!("column '{}' is ambiguous", &self.name),
276 self.spans().first(),
277 );
278 // TODO If [`DFSchema`] had spans, we could show the
279 // user which columns are candidates, or which table
280 // they come from. For now, let's list the table names
281 // only.
282 add_possible_columns_to_diag(
283 &mut diagnostic,
284 &Column::new_unqualified(&self.name),
285 &columns,
286 );
287 err.with_diagnostic(diagnostic)
288 });
289 }
290 }
291 }
292
293 _schema_err!(SchemaError::FieldNotFound {
294 field: Box::new(self),
295 valid_fields: schemas
296 .iter()
297 .flat_map(|s| s.iter())
298 .flat_map(|s| s.columns())
299 .collect(),
300 })
301 }
302
303 /// Returns a reference to the set of locations in the SQL query where this
304 /// column appears, if known.
305 pub fn spans(&self) -> &Spans {
306 &self.spans
307 }
308
309 /// Returns a mutable reference to the set of locations in the SQL query
310 /// where this column appears, if known.
311 pub fn spans_mut(&mut self) -> &mut Spans {
312 &mut self.spans
313 }
314
315 /// Replaces the set of locations in the SQL query where this column
316 /// appears, if known.
317 pub fn with_spans(mut self, spans: Spans) -> Self {
318 self.spans = spans;
319 self
320 }
321
322 /// Qualifies the column with the given table reference.
323 pub fn with_relation(&self, relation: TableReference) -> Self {
324 Self {
325 relation: Some(relation),
326 ..self.clone()
327 }
328 }
329
330 pub fn is_lambda_parameter(&self, lambdas_params: &crate::HashSet<impl Borrow<str> + Eq + std::hash::Hash>) -> bool {
331 // currently, references to lambda parameters are always unqualified
332 self.relation.is_none() && lambdas_params.contains(self.name())
333 }
334}
335
336impl From<&str> for Column {
337 fn from(c: &str) -> Self {
338 Self::from_qualified_name(c)
339 }
340}
341
342/// Create a column, cloning the string
343impl From<&String> for Column {
344 fn from(c: &String) -> Self {
345 Self::from_qualified_name(c)
346 }
347}
348
349/// Create a column, reusing the existing string
350impl From<String> for Column {
351 fn from(c: String) -> Self {
352 Self::from_qualified_name(c)
353 }
354}
355
356/// Create a column, use qualifier and field name
357impl From<(Option<&TableReference>, &Field)> for Column {
358 fn from((relation, field): (Option<&TableReference>, &Field)) -> Self {
359 Self::new(relation.cloned(), field.name())
360 }
361}
362
363/// Create a column, use qualifier and field name
364impl From<(Option<&TableReference>, &FieldRef)> for Column {
365 fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
366 Self::new(relation.cloned(), field.name())
367 }
368}
369
370#[cfg(feature = "sql")]
371impl std::str::FromStr for Column {
372 type Err = std::convert::Infallible;
373
374 fn from_str(s: &str) -> Result<Self, Self::Err> {
375 Ok(s.into())
376 }
377}
378
379impl fmt::Display for Column {
380 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
381 write!(f, "{}", self.flat_name())
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388 use arrow::datatypes::{DataType, SchemaBuilder};
389 use std::sync::Arc;
390
391 fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> {
392 let mut schema_builder = SchemaBuilder::new();
393 schema_builder.extend(
394 names
395 .iter()
396 .map(|f| Field::new(*f, DataType::Boolean, true)),
397 );
398 let schema = Arc::new(schema_builder.finish());
399 DFSchema::try_from_qualified_schema(qualifier, &schema)
400 }
401
402 #[test]
403 fn test_normalize_with_schemas_and_ambiguity_check() -> Result<()> {
404 let schema1 = create_qualified_schema("t1", vec!["a", "b"])?;
405 let schema2 = create_qualified_schema("t2", vec!["c", "d"])?;
406 let schema3 = create_qualified_schema("t3", vec!["a", "b", "c", "d", "e"])?;
407
408 // already normalized
409 let col = Column::new(Some("t1"), "a");
410 let col = col.normalize_with_schemas_and_ambiguity_check(&[], &[])?;
411 assert_eq!(col, Column::new(Some("t1"), "a"));
412
413 // should find in first level (schema1)
414 let col = Column::from_name("a");
415 let col = col.normalize_with_schemas_and_ambiguity_check(
416 &[&[&schema1, &schema2], &[&schema3]],
417 &[],
418 )?;
419 assert_eq!(col, Column::new(Some("t1"), "a"));
420
421 // should find in second level (schema3)
422 let col = Column::from_name("e");
423 let col = col.normalize_with_schemas_and_ambiguity_check(
424 &[&[&schema1, &schema2], &[&schema3]],
425 &[],
426 )?;
427 assert_eq!(col, Column::new(Some("t3"), "e"));
428
429 // using column in first level (pick schema1)
430 let mut using_columns = HashSet::new();
431 using_columns.insert(Column::new(Some("t1"), "a"));
432 using_columns.insert(Column::new(Some("t3"), "a"));
433 let col = Column::from_name("a");
434 let col = col.normalize_with_schemas_and_ambiguity_check(
435 &[&[&schema1, &schema3], &[&schema2]],
436 &[using_columns],
437 )?;
438 assert_eq!(col, Column::new(Some("t1"), "a"));
439
440 // not found in any level
441 let col = Column::from_name("z");
442 let err = col
443 .normalize_with_schemas_and_ambiguity_check(
444 &[&[&schema1, &schema2], &[&schema3]],
445 &[],
446 )
447 .expect_err("should've failed to find field");
448 let expected = "Schema error: No field named z. \
449 Valid fields are t1.a, t1.b, t2.c, t2.d, t3.a, t3.b, t3.c, t3.d, t3.e.";
450 assert_eq!(err.strip_backtrace(), expected);
451
452 // ambiguous column reference
453 let col = Column::from_name("a");
454 let err = col
455 .normalize_with_schemas_and_ambiguity_check(
456 &[&[&schema1, &schema3], &[&schema2]],
457 &[],
458 )
459 .expect_err("should've found ambiguous field");
460 let expected = "Schema error: Ambiguous reference to unqualified field a";
461 assert_eq!(err.strip_backtrace(), expected);
462
463 Ok(())
464 }
465}