datafusion_physical_expr/expressions/
column.rs1use std::any::Any;
21use std::hash::Hash;
22use std::sync::Arc;
23
24use crate::physical_expr::PhysicalExpr;
25use crate::PhysicalExprExt;
26use arrow::datatypes::FieldRef;
27use arrow::{
28 datatypes::{DataType, Schema, SchemaRef},
29 record_batch::RecordBatch,
30};
31use datafusion_common::tree_node::{Transformed, TransformedResult};
32use datafusion_common::{internal_err, plan_err, Result};
33use datafusion_expr::ColumnarValue;
34
35#[derive(Debug, Hash, PartialEq, Eq, Clone)]
68pub struct Column {
69 name: String,
71 index: usize,
74}
75
76impl Column {
77 pub fn new(name: &str, index: usize) -> Self {
80 Self {
81 name: name.to_owned(),
82 index,
83 }
84 }
85
86 pub fn new_with_schema(name: &str, schema: &Schema) -> Result<Self> {
89 Ok(Column::new(name, schema.index_of(name)?))
90 }
91
92 pub fn name(&self) -> &str {
94 &self.name
95 }
96
97 pub fn index(&self) -> usize {
99 self.index
100 }
101}
102
103impl std::fmt::Display for Column {
104 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
105 write!(f, "{}@{}", self.name, self.index)
106 }
107}
108
109impl PhysicalExpr for Column {
110 fn as_any(&self) -> &dyn Any {
112 self
113 }
114
115 fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
117 self.bounds_check(input_schema)?;
118 Ok(input_schema.field(self.index).data_type().clone())
119 }
120
121 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
123 self.bounds_check(input_schema)?;
124 Ok(input_schema.field(self.index).is_nullable())
125 }
126
127 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
129 self.bounds_check(batch.schema().as_ref())?;
130 Ok(ColumnarValue::Array(Arc::clone(batch.column(self.index))))
131 }
132
133 fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
134 Ok(input_schema.field(self.index).clone().into())
135 }
136
137 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
138 vec![]
139 }
140
141 fn with_new_children(
142 self: Arc<Self>,
143 _children: Vec<Arc<dyn PhysicalExpr>>,
144 ) -> Result<Arc<dyn PhysicalExpr>> {
145 Ok(self)
146 }
147
148 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 write!(f, "{}", self.name)
150 }
151}
152
153impl Column {
154 fn bounds_check(&self, input_schema: &Schema) -> Result<()> {
155 if self.index < input_schema.fields.len() {
156 Ok(())
157 } else {
158 internal_err!(
159 "PhysicalExpr Column references column '{}' at index {} (zero-based) but input schema only has {} columns: {:?}",
160 self.name,
161 self.index,
162 input_schema.fields.len(),
163 input_schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>()
164 )
165 }
166 }
167}
168
169pub fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
171 Ok(Arc::new(Column::new_with_schema(name, schema)?))
172}
173
174pub fn with_new_schema(
180 expr: Arc<dyn PhysicalExpr>,
181 schema: &SchemaRef,
182) -> Result<Arc<dyn PhysicalExpr>> {
183 expr.transform_up_with_lambdas_params(|expr, lambdas_params| {
184 match expr.as_any().downcast_ref::<Column>() {
185 Some(col) if !lambdas_params.contains(col.name()) => {
186 let idx = col.index();
187 let Some(field) = schema.fields().get(idx) else {
188 return plan_err!(
189 "New schema has fewer columns than original schema"
190 );
191 };
192 let new_col = Column::new(field.name(), idx);
193
194 Ok(Transformed::yes(Arc::new(new_col) as _))
195 }
196 _ => Ok(Transformed::no(expr)),
197 }
198 })
199 .data()
200}
201
202#[cfg(test)]
203mod test {
204 use super::Column;
205 use crate::physical_expr::PhysicalExpr;
206
207 use arrow::array::StringArray;
208 use arrow::datatypes::{DataType, Field, Schema};
209 use arrow::record_batch::RecordBatch;
210
211 use std::sync::Arc;
212
213 #[test]
214 fn out_of_bounds_data_type() {
215 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
216 let col = Column::new("id", 9);
217 let error = col.data_type(&schema).expect_err("error").strip_backtrace();
218 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
219 but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
220 in DataFusion's code. Please help us to resolve this by filing a bug report \
221 in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error))
222 }
223
224 #[test]
225 fn out_of_bounds_nullable() {
226 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
227 let col = Column::new("id", 9);
228 let error = col.nullable(&schema).expect_err("error").strip_backtrace();
229 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
230 but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
231 in DataFusion's code. Please help us to resolve this by filing a bug report \
232 in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error));
233 }
234
235 #[test]
236 fn out_of_bounds_evaluate() {
237 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
238 let data: StringArray = vec!["data"].into();
239 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
240 let col = Column::new("id", 9);
241 let error = col.evaluate(&batch).expect_err("error").strip_backtrace();
242 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
243 but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
244 in DataFusion's code. Please help us to resolve this by filing a bug report \
245 in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error));
246 }
247}