1use super::expressions::{Column, Literal};
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::{
26 DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
27 SendableRecordBatchStream, Statistics,
28};
29use crate::execution_plan::CardinalityEffect;
30use crate::filter_pushdown::{
31 ChildPushdownResult, FilterDescription, FilterPushdownPhase,
32 FilterPushdownPropagation,
33};
34use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
35use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr};
36use std::any::Any;
37use std::collections::HashMap;
38use std::pin::Pin;
39use std::sync::Arc;
40use std::task::{Context, Poll};
41
42use arrow::datatypes::SchemaRef;
43use arrow::record_batch::{RecordBatch, RecordBatchOptions};
44use datafusion_common::config::ConfigOptions;
45use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNodeRecursion};
46use datafusion_common::{internal_err, JoinSide, Result};
47use datafusion_execution::TaskContext;
48use datafusion_physical_expr::equivalence::ProjectionMapping;
49use datafusion_physical_expr::utils::collect_columns;
50use datafusion_physical_expr::{PhysicalExprExt, PhysicalExprRef};
51use datafusion_physical_expr_common::physical_expr::fmt_sql;
52use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
53pub use datafusion_physical_expr::projection::{
56 update_expr, ProjectionExpr, ProjectionExprs,
57};
58
59use futures::stream::{Stream, StreamExt};
60use log::trace;
61
62#[derive(Debug, Clone)]
67pub struct ProjectionExec {
68 projection: ProjectionExprs,
70 schema: SchemaRef,
72 input: Arc<dyn ExecutionPlan>,
74 metrics: ExecutionPlanMetricsSet,
76 cache: PlanProperties,
78}
79
80impl ProjectionExec {
81 pub fn try_new<I, E>(expr: I, input: Arc<dyn ExecutionPlan>) -> Result<Self>
131 where
132 I: IntoIterator<Item = E>,
133 E: Into<ProjectionExpr>,
134 {
135 let input_schema = input.schema();
136 let expr_vec = expr.into_iter().map(Into::into).collect::<Vec<_>>();
138 let projection = ProjectionExprs::new(expr_vec);
139
140 let schema = Arc::new(projection.project_schema(&input_schema)?);
141
142 let projection_mapping = projection.projection_mapping(&input_schema)?;
144 let cache =
145 Self::compute_properties(&input, &projection_mapping, Arc::clone(&schema))?;
146 Ok(Self {
147 projection,
148 schema,
149 input,
150 metrics: ExecutionPlanMetricsSet::new(),
151 cache,
152 })
153 }
154
155 pub fn expr(&self) -> &[ProjectionExpr] {
157 self.projection.as_ref()
158 }
159
160 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
162 &self.input
163 }
164
165 fn compute_properties(
167 input: &Arc<dyn ExecutionPlan>,
168 projection_mapping: &ProjectionMapping,
169 schema: SchemaRef,
170 ) -> Result<PlanProperties> {
171 let input_eq_properties = input.equivalence_properties();
173 let eq_properties = input_eq_properties.project(projection_mapping, schema);
174 let output_partitioning = input
176 .output_partitioning()
177 .project(projection_mapping, input_eq_properties);
178
179 Ok(PlanProperties::new(
180 eq_properties,
181 output_partitioning,
182 input.pipeline_behavior(),
183 input.boundedness(),
184 ))
185 }
186}
187
188impl DisplayAs for ProjectionExec {
189 fn fmt_as(
190 &self,
191 t: DisplayFormatType,
192 f: &mut std::fmt::Formatter,
193 ) -> std::fmt::Result {
194 match t {
195 DisplayFormatType::Default | DisplayFormatType::Verbose => {
196 let expr: Vec<String> = self
197 .projection
198 .as_ref()
199 .iter()
200 .map(|proj_expr| {
201 let e = proj_expr.expr.to_string();
202 if e != proj_expr.alias {
203 format!("{e} as {}", proj_expr.alias)
204 } else {
205 e
206 }
207 })
208 .collect();
209
210 write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
211 }
212 DisplayFormatType::TreeRender => {
213 for (i, proj_expr) in self.expr().iter().enumerate() {
214 let expr_sql = fmt_sql(proj_expr.expr.as_ref());
215 if proj_expr.expr.to_string() == proj_expr.alias {
216 writeln!(f, "expr{i}={expr_sql}")?;
217 } else {
218 writeln!(f, "{}={expr_sql}", proj_expr.alias)?;
219 }
220 }
221
222 Ok(())
223 }
224 }
225 }
226}
227
228impl ExecutionPlan for ProjectionExec {
229 fn name(&self) -> &'static str {
230 "ProjectionExec"
231 }
232
233 fn as_any(&self) -> &dyn Any {
235 self
236 }
237
238 fn properties(&self) -> &PlanProperties {
239 &self.cache
240 }
241
242 fn maintains_input_order(&self) -> Vec<bool> {
243 vec![true]
245 }
246
247 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
248 let all_simple_exprs = self.projection.iter().all(|proj_expr| {
249 proj_expr.expr.as_any().is::<Column>()
250 || proj_expr.expr.as_any().is::<Literal>()
251 });
252 vec![!all_simple_exprs]
255 }
256
257 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
258 vec![&self.input]
259 }
260
261 fn with_new_children(
262 self: Arc<Self>,
263 mut children: Vec<Arc<dyn ExecutionPlan>>,
264 ) -> Result<Arc<dyn ExecutionPlan>> {
265 ProjectionExec::try_new(self.projection.clone(), children.swap_remove(0))
266 .map(|p| Arc::new(p) as _)
267 }
268
269 fn execute(
270 &self,
271 partition: usize,
272 context: Arc<TaskContext>,
273 ) -> Result<SendableRecordBatchStream> {
274 trace!("Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
275 Ok(Box::pin(ProjectionStream::new(
276 Arc::clone(&self.schema),
277 self.projection.expr_iter().collect(),
278 self.input.execute(partition, context)?,
279 BaselineMetrics::new(&self.metrics, partition),
280 )))
281 }
282
283 fn metrics(&self) -> Option<MetricsSet> {
284 Some(self.metrics.clone_inner())
285 }
286
287 fn statistics(&self) -> Result<Statistics> {
288 self.partition_statistics(None)
289 }
290
291 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
292 let input_stats = self.input.partition_statistics(partition)?;
293 self.projection
294 .project_statistics(input_stats, &self.input.schema())
295 }
296
297 fn supports_limit_pushdown(&self) -> bool {
298 true
299 }
300
301 fn cardinality_effect(&self) -> CardinalityEffect {
302 CardinalityEffect::Equal
303 }
304
305 fn try_swapping_with_projection(
306 &self,
307 projection: &ProjectionExec,
308 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
309 let maybe_unified = try_unifying_projections(projection, self)?;
310 if let Some(new_plan) = maybe_unified {
311 remove_unnecessary_projections(new_plan).data().map(Some)
313 } else {
314 Ok(Some(Arc::new(projection.clone())))
315 }
316 }
317
318 fn gather_filters_for_pushdown(
319 &self,
320 _phase: FilterPushdownPhase,
321 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
322 _config: &ConfigOptions,
323 ) -> Result<FilterDescription> {
324 FilterDescription::from_children(parent_filters, &self.children())
328 }
329
330 fn handle_child_pushdown_result(
331 &self,
332 _phase: FilterPushdownPhase,
333 child_pushdown_result: ChildPushdownResult,
334 _config: &ConfigOptions,
335 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
336 Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
337 }
338}
339
340impl ProjectionStream {
341 fn new(
343 schema: SchemaRef,
344 expr: Vec<Arc<dyn PhysicalExpr>>,
345 input: SendableRecordBatchStream,
346 baseline_metrics: BaselineMetrics,
347 ) -> Self {
348 Self {
349 schema,
350 expr,
351 input,
352 baseline_metrics,
353 }
354 }
355
356 fn batch_project(&self, batch: &RecordBatch) -> Result<RecordBatch> {
357 let _timer = self.baseline_metrics.elapsed_compute().timer();
359 let arrays = self
360 .expr
361 .iter()
362 .map(|expr| {
363 expr.evaluate(batch)
364 .and_then(|v| v.into_array(batch.num_rows()))
365 })
366 .collect::<Result<Vec<_>>>()?;
367
368 if arrays.is_empty() {
369 let options =
370 RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
371 RecordBatch::try_new_with_options(Arc::clone(&self.schema), arrays, &options)
372 .map_err(Into::into)
373 } else {
374 RecordBatch::try_new(Arc::clone(&self.schema), arrays).map_err(Into::into)
375 }
376 }
377}
378
379struct ProjectionStream {
381 schema: SchemaRef,
382 expr: Vec<Arc<dyn PhysicalExpr>>,
383 input: SendableRecordBatchStream,
384 baseline_metrics: BaselineMetrics,
385}
386
387impl Stream for ProjectionStream {
388 type Item = Result<RecordBatch>;
389
390 fn poll_next(
391 mut self: Pin<&mut Self>,
392 cx: &mut Context<'_>,
393 ) -> Poll<Option<Self::Item>> {
394 let poll = self.input.poll_next_unpin(cx).map(|x| match x {
395 Some(Ok(batch)) => Some(self.batch_project(&batch)),
396 other => other,
397 });
398
399 self.baseline_metrics.record_poll(poll)
400 }
401
402 fn size_hint(&self) -> (usize, Option<usize>) {
403 self.input.size_hint()
405 }
406}
407
408impl RecordBatchStream for ProjectionStream {
409 fn schema(&self) -> SchemaRef {
411 Arc::clone(&self.schema)
412 }
413}
414
415pub trait EmbeddedProjection: ExecutionPlan + Sized {
416 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>;
417}
418
419pub fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
422 projection: &ProjectionExec,
423 execution_plan: &Exec,
424) -> Result<Option<Arc<dyn ExecutionPlan>>> {
425 let projection_index = collect_column_indices(projection.expr());
427
428 if projection_index.is_empty() {
429 return Ok(None);
430 };
431
432 if projection_index.len() == projection_index.last().unwrap() + 1
435 && projection_index.len() == execution_plan.schema().fields().len()
436 {
437 return Ok(None);
438 }
439
440 let new_execution_plan =
441 Arc::new(execution_plan.with_projection(Some(projection_index.to_vec()))?);
442
443 let embed_project_exprs = projection_index
445 .iter()
446 .zip(new_execution_plan.schema().fields())
447 .map(|(index, field)| ProjectionExpr {
448 expr: Arc::new(Column::new(field.name(), *index)) as Arc<dyn PhysicalExpr>,
449 alias: field.name().to_owned(),
450 })
451 .collect::<Vec<_>>();
452
453 let mut new_projection_exprs = Vec::with_capacity(projection.expr().len());
454
455 for proj_expr in projection.expr() {
456 let Some(expr) =
458 update_expr(&proj_expr.expr, embed_project_exprs.as_slice(), false)?
459 else {
460 return Ok(None);
461 };
462 new_projection_exprs.push(ProjectionExpr {
463 expr,
464 alias: proj_expr.alias.clone(),
465 });
466 }
467 let new_projection = Arc::new(ProjectionExec::try_new(
469 new_projection_exprs,
470 Arc::clone(&new_execution_plan) as _,
471 )?);
472 if is_projection_removable(&new_projection) {
473 Ok(Some(new_execution_plan))
474 } else {
475 Ok(Some(new_projection))
476 }
477}
478
479pub struct JoinData {
480 pub projected_left_child: ProjectionExec,
481 pub projected_right_child: ProjectionExec,
482 pub join_filter: Option<JoinFilter>,
483 pub join_on: JoinOn,
484}
485
486pub fn try_pushdown_through_join(
487 projection: &ProjectionExec,
488 join_left: &Arc<dyn ExecutionPlan>,
489 join_right: &Arc<dyn ExecutionPlan>,
490 join_on: JoinOnRef,
491 schema: SchemaRef,
492 filter: Option<&JoinFilter>,
493) -> Result<Option<JoinData>> {
494 let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
496 return Ok(None);
497 };
498
499 let (far_right_left_col_ind, far_left_right_col_ind) =
500 join_table_borders(join_left.schema().fields().len(), &projection_as_columns);
501
502 if !join_allows_pushdown(
503 &projection_as_columns,
504 &schema,
505 far_right_left_col_ind,
506 far_left_right_col_ind,
507 ) {
508 return Ok(None);
509 }
510
511 let new_filter = if let Some(filter) = filter {
512 match update_join_filter(
513 &projection_as_columns[0..=far_right_left_col_ind as _],
514 &projection_as_columns[far_left_right_col_ind as _..],
515 filter,
516 join_left.schema().fields().len(),
517 ) {
518 Some(updated_filter) => Some(updated_filter),
519 None => return Ok(None),
520 }
521 } else {
522 None
523 };
524
525 let Some(new_on) = update_join_on(
526 &projection_as_columns[0..=far_right_left_col_ind as _],
527 &projection_as_columns[far_left_right_col_ind as _..],
528 join_on,
529 join_left.schema().fields().len(),
530 ) else {
531 return Ok(None);
532 };
533
534 let (new_left, new_right) = new_join_children(
535 &projection_as_columns,
536 far_right_left_col_ind,
537 far_left_right_col_ind,
538 join_left,
539 join_right,
540 )?;
541
542 Ok(Some(JoinData {
543 projected_left_child: new_left,
544 projected_right_child: new_right,
545 join_filter: new_filter,
546 join_on: new_on,
547 }))
548}
549
550pub fn remove_unnecessary_projections(
555 plan: Arc<dyn ExecutionPlan>,
556) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
557 let maybe_modified =
558 if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
559 if is_projection_removable(projection) {
562 return Ok(Transformed::yes(Arc::clone(projection.input())));
563 }
564 projection
566 .input()
567 .try_swapping_with_projection(projection)?
568 } else {
569 return Ok(Transformed::no(plan));
570 };
571 Ok(maybe_modified.map_or_else(|| Transformed::no(plan), Transformed::yes))
572}
573
574fn is_projection_removable(projection: &ProjectionExec) -> bool {
579 let exprs = projection.expr();
580 exprs.iter().enumerate().all(|(idx, proj_expr)| {
581 let Some(col) = proj_expr.expr.as_any().downcast_ref::<Column>() else {
582 return false;
583 };
584 col.name() == proj_expr.alias && col.index() == idx
585 }) && exprs.len() == projection.input().schema().fields().len()
586}
587
588pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> bool {
591 exprs.iter().all(|proj_expr| {
592 proj_expr
593 .expr
594 .as_any()
595 .downcast_ref::<Column>()
596 .map(|column| column.name() == proj_expr.alias)
597 .unwrap_or(false)
598 })
599}
600
601pub fn new_projections_for_columns(
605 projection: &[ProjectionExpr],
606 source: &[usize],
607) -> Vec<usize> {
608 projection
609 .iter()
610 .filter_map(|proj_expr| {
611 proj_expr
612 .expr
613 .as_any()
614 .downcast_ref::<Column>()
615 .map(|expr| source[expr.index()])
616 })
617 .collect()
618}
619
620pub fn make_with_child(
623 projection: &ProjectionExec,
624 child: &Arc<dyn ExecutionPlan>,
625) -> Result<Arc<dyn ExecutionPlan>> {
626 ProjectionExec::try_new(projection.expr().to_vec(), Arc::clone(child))
627 .map(|e| Arc::new(e) as _)
628}
629
630pub fn all_columns(exprs: &[ProjectionExpr]) -> bool {
632 exprs
633 .iter()
634 .all(|proj_expr| proj_expr.expr.as_any().is::<Column>())
635}
636
637pub fn update_ordering(
640 ordering: LexOrdering,
641 projected_exprs: &[ProjectionExpr],
642) -> Result<Option<LexOrdering>> {
643 let mut updated_exprs = vec![];
644 for mut sort_expr in ordering.into_iter() {
645 let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
646 else {
647 return Ok(None);
648 };
649 sort_expr.expr = updated_expr;
650 updated_exprs.push(sort_expr);
651 }
652 Ok(LexOrdering::new(updated_exprs))
653}
654
655pub fn update_ordering_requirement(
658 reqs: LexRequirement,
659 projected_exprs: &[ProjectionExpr],
660) -> Result<Option<LexRequirement>> {
661 let mut updated_exprs = vec![];
662 for mut sort_expr in reqs.into_iter() {
663 let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
664 else {
665 return Ok(None);
666 };
667 sort_expr.expr = updated_expr;
668 updated_exprs.push(sort_expr);
669 }
670 Ok(LexRequirement::new(updated_exprs))
671}
672
673pub fn physical_to_column_exprs(
676 exprs: &[ProjectionExpr],
677) -> Option<Vec<(Column, String)>> {
678 exprs
679 .iter()
680 .map(|proj_expr| {
681 proj_expr
682 .expr
683 .as_any()
684 .downcast_ref::<Column>()
685 .map(|col| (col.clone(), proj_expr.alias.clone()))
686 })
687 .collect()
688}
689
690pub fn new_join_children(
694 projection_as_columns: &[(Column, String)],
695 far_right_left_col_ind: i32,
696 far_left_right_col_ind: i32,
697 left_child: &Arc<dyn ExecutionPlan>,
698 right_child: &Arc<dyn ExecutionPlan>,
699) -> Result<(ProjectionExec, ProjectionExec)> {
700 let new_left = ProjectionExec::try_new(
701 projection_as_columns[0..=far_right_left_col_ind as _]
702 .iter()
703 .map(|(col, alias)| ProjectionExpr {
704 expr: Arc::new(Column::new(col.name(), col.index())) as _,
705 alias: alias.clone(),
706 }),
707 Arc::clone(left_child),
708 )?;
709 let left_size = left_child.schema().fields().len() as i32;
710 let new_right = ProjectionExec::try_new(
711 projection_as_columns[far_left_right_col_ind as _..]
712 .iter()
713 .map(|(col, alias)| {
714 ProjectionExpr {
715 expr: Arc::new(Column::new(
716 col.name(),
717 (col.index() as i32 - left_size) as _,
720 )) as _,
721 alias: alias.clone(),
722 }
723 }),
724 Arc::clone(right_child),
725 )?;
726
727 Ok((new_left, new_right))
728}
729
730pub fn join_allows_pushdown(
736 projection_as_columns: &[(Column, String)],
737 join_schema: &SchemaRef,
738 far_right_left_col_ind: i32,
739 far_left_right_col_ind: i32,
740) -> bool {
741 projection_as_columns.len() < join_schema.fields().len()
743 && (far_right_left_col_ind + 1 == far_left_right_col_ind)
745 && far_right_left_col_ind >= 0
747 && far_left_right_col_ind < projection_as_columns.len() as i32
748}
749
750pub fn join_table_borders(
756 left_table_column_count: usize,
757 projection_as_columns: &[(Column, String)],
758) -> (i32, i32) {
759 let far_right_left_col_ind = projection_as_columns
760 .iter()
761 .enumerate()
762 .take_while(|(_, (projection_column, _))| {
763 projection_column.index() < left_table_column_count
764 })
765 .last()
766 .map(|(index, _)| index as i32)
767 .unwrap_or(-1);
768
769 let far_left_right_col_ind = projection_as_columns
770 .iter()
771 .enumerate()
772 .rev()
773 .take_while(|(_, (projection_column, _))| {
774 projection_column.index() >= left_table_column_count
775 })
776 .last()
777 .map(|(index, _)| index as i32)
778 .unwrap_or(projection_as_columns.len() as i32);
779
780 (far_right_left_col_ind, far_left_right_col_ind)
781}
782
783pub fn update_join_on(
786 proj_left_exprs: &[(Column, String)],
787 proj_right_exprs: &[(Column, String)],
788 hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)],
789 left_field_size: usize,
790) -> Option<Vec<(PhysicalExprRef, PhysicalExprRef)>> {
791 #[allow(clippy::map_identity)]
795 let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on
796 .iter()
797 .map(|(left, right)| (left, right))
798 .unzip();
799
800 let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0);
801 let new_right_columns =
802 new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);
803
804 match (new_left_columns, new_right_columns) {
805 (Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()),
806 _ => None,
807 }
808}
809
810pub fn update_join_filter(
813 projection_left_exprs: &[(Column, String)],
814 projection_right_exprs: &[(Column, String)],
815 join_filter: &JoinFilter,
816 left_field_size: usize,
817) -> Option<JoinFilter> {
818 let mut new_left_indices = new_indices_for_join_filter(
819 join_filter,
820 JoinSide::Left,
821 projection_left_exprs,
822 0,
823 )
824 .into_iter();
825 let mut new_right_indices = new_indices_for_join_filter(
826 join_filter,
827 JoinSide::Right,
828 projection_right_exprs,
829 left_field_size,
830 )
831 .into_iter();
832
833 (new_right_indices.len() + new_left_indices.len()
835 == join_filter.column_indices().len())
836 .then(|| {
837 JoinFilter::new(
838 Arc::clone(join_filter.expression()),
839 join_filter
840 .column_indices()
841 .iter()
842 .map(|col_idx| ColumnIndex {
843 index: if col_idx.side == JoinSide::Left {
844 new_left_indices.next().unwrap()
845 } else {
846 new_right_indices.next().unwrap()
847 },
848 side: col_idx.side,
849 })
850 .collect(),
851 Arc::clone(join_filter.schema()),
852 )
853 })
854}
855
856fn try_unifying_projections(
858 projection: &ProjectionExec,
859 child: &ProjectionExec,
860) -> Result<Option<Arc<dyn ExecutionPlan>>> {
861 let mut projected_exprs = vec![];
862 let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
863
864 projection.expr().iter().for_each(|proj_expr| {
866 proj_expr
867 .expr
868 .apply_with_lambdas_params(|expr, lambdas_params| {
869 Ok({
870 if let Some(column) = expr.as_any().downcast_ref::<Column>() {
871 if !lambdas_params.contains(column.name()) {
872 *column_ref_map.entry(column.clone()).or_default() += 1;
873 }
874 }
875 TreeNodeRecursion::Continue
876 })
877 })
878 .unwrap();
879 });
880 if column_ref_map.iter().any(|(column, count)| {
885 *count > 1 && !is_expr_trivial(&Arc::clone(&child.expr()[column.index()].expr))
886 }) {
887 return Ok(None);
888 }
889 for proj_expr in projection.expr() {
890 let Some(expr) = update_expr(&proj_expr.expr, child.expr(), true)? else {
894 return Ok(None);
895 };
896 projected_exprs.push(ProjectionExpr {
897 expr,
898 alias: proj_expr.alias.clone(),
899 });
900 }
901 ProjectionExec::try_new(projected_exprs, Arc::clone(child.input()))
902 .map(|e| Some(Arc::new(e) as _))
903}
904
905fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec<usize> {
907 let mut indices = exprs
909 .iter()
910 .flat_map(|proj_expr| collect_columns(&proj_expr.expr))
911 .map(|x| x.index())
912 .collect::<std::collections::HashSet<_>>()
913 .into_iter()
914 .collect::<Vec<_>>();
915 indices.sort();
916 indices
917}
918
919fn new_indices_for_join_filter(
927 join_filter: &JoinFilter,
928 join_side: JoinSide,
929 projection_exprs: &[(Column, String)],
930 column_index_offset: usize,
931) -> Vec<usize> {
932 join_filter
933 .column_indices()
934 .iter()
935 .filter(|col_idx| col_idx.side == join_side)
936 .filter_map(|col_idx| {
937 projection_exprs
938 .iter()
939 .position(|(col, _)| col_idx.index + column_index_offset == col.index())
940 })
941 .collect()
942}
943
944fn new_columns_for_join_on(
952 hash_join_on: &[&PhysicalExprRef],
953 projection_exprs: &[(Column, String)],
954 column_index_offset: usize,
955) -> Option<Vec<PhysicalExprRef>> {
956 let new_columns = hash_join_on
957 .iter()
958 .filter_map(|on| {
959 Arc::clone(*on)
961 .transform_with_lambdas_params(|expr, lambdas_params| {
962 match expr.as_any().downcast_ref::<Column>() {
963 Some(column) if !lambdas_params.contains(column.name()) => {
964 let new_column = projection_exprs
965 .iter()
966 .enumerate()
967 .find(|(_, (proj_column, _))| {
968 column.name() == proj_column.name()
969 && column.index() + column_index_offset
970 == proj_column.index()
971 })
972 .map(|(index, (_, alias))| Column::new(alias, index));
973 if let Some(new_column) = new_column {
974 Ok(Transformed::yes(Arc::new(new_column)))
975 } else {
976 internal_err!(
980 "Column {:?} not found in projection expressions",
981 column
982 )
983 }
984 }
985 _ => Ok(Transformed::no(expr)),
986 }
987 })
988 .data()
989 .ok()
990 })
991 .collect::<Vec<_>>();
992 (new_columns.len() == hash_join_on.len()).then_some(new_columns)
993}
994
995fn is_expr_trivial(expr: &Arc<dyn PhysicalExpr>) -> bool {
998 expr.as_any().downcast_ref::<Column>().is_some()
999 || expr.as_any().downcast_ref::<Literal>().is_some()
1000}
1001
1002#[cfg(test)]
1003mod tests {
1004 use super::*;
1005 use std::sync::Arc;
1006
1007 use crate::common::collect;
1008 use crate::test;
1009 use crate::test::exec::StatisticsExec;
1010
1011 use arrow::datatypes::{DataType, Field, Schema};
1012 use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
1013 use datafusion_common::ScalarValue;
1014
1015 use datafusion_expr::Operator;
1016 use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, Literal};
1017
1018 #[test]
1019 fn test_collect_column_indices() -> Result<()> {
1020 let expr = Arc::new(BinaryExpr::new(
1021 Arc::new(Column::new("b", 7)),
1022 Operator::Minus,
1023 Arc::new(BinaryExpr::new(
1024 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1025 Operator::Plus,
1026 Arc::new(Column::new("a", 1)),
1027 )),
1028 ));
1029 let column_indices = collect_column_indices(&[ProjectionExpr {
1030 expr,
1031 alias: "b-(1+a)".to_string(),
1032 }]);
1033 assert_eq!(column_indices, vec![1, 7]);
1034 Ok(())
1035 }
1036
1037 #[test]
1038 fn test_join_table_borders() -> Result<()> {
1039 let projections = vec![
1040 (Column::new("b", 1), "b".to_owned()),
1041 (Column::new("c", 2), "c".to_owned()),
1042 (Column::new("e", 4), "e".to_owned()),
1043 (Column::new("d", 3), "d".to_owned()),
1044 (Column::new("c", 2), "c".to_owned()),
1045 (Column::new("f", 5), "f".to_owned()),
1046 (Column::new("h", 7), "h".to_owned()),
1047 (Column::new("g", 6), "g".to_owned()),
1048 ];
1049 let left_table_column_count = 5;
1050 assert_eq!(
1051 join_table_borders(left_table_column_count, &projections),
1052 (4, 5)
1053 );
1054
1055 let left_table_column_count = 8;
1056 assert_eq!(
1057 join_table_borders(left_table_column_count, &projections),
1058 (7, 8)
1059 );
1060
1061 let left_table_column_count = 1;
1062 assert_eq!(
1063 join_table_borders(left_table_column_count, &projections),
1064 (-1, 0)
1065 );
1066
1067 let projections = vec![
1068 (Column::new("a", 0), "a".to_owned()),
1069 (Column::new("b", 1), "b".to_owned()),
1070 (Column::new("d", 3), "d".to_owned()),
1071 (Column::new("g", 6), "g".to_owned()),
1072 (Column::new("e", 4), "e".to_owned()),
1073 (Column::new("f", 5), "f".to_owned()),
1074 (Column::new("e", 4), "e".to_owned()),
1075 (Column::new("h", 7), "h".to_owned()),
1076 ];
1077 let left_table_column_count = 5;
1078 assert_eq!(
1079 join_table_borders(left_table_column_count, &projections),
1080 (2, 7)
1081 );
1082
1083 let left_table_column_count = 7;
1084 assert_eq!(
1085 join_table_borders(left_table_column_count, &projections),
1086 (6, 7)
1087 );
1088
1089 Ok(())
1090 }
1091
1092 #[tokio::test]
1093 async fn project_no_column() -> Result<()> {
1094 let task_ctx = Arc::new(TaskContext::default());
1095
1096 let exec = test::scan_partitioned(1);
1097 let expected = collect(exec.execute(0, Arc::clone(&task_ctx))?).await?;
1098
1099 let projection = ProjectionExec::try_new(vec![] as Vec<ProjectionExpr>, exec)?;
1100 let stream = projection.execute(0, Arc::clone(&task_ctx))?;
1101 let output = collect(stream).await?;
1102 assert_eq!(output.len(), expected.len());
1103
1104 Ok(())
1105 }
1106
1107 #[tokio::test]
1108 async fn project_old_syntax() {
1109 let exec = test::scan_partitioned(1);
1110 let schema = exec.schema();
1111 let expr = col("i", &schema).unwrap();
1112 ProjectionExec::try_new(
1113 vec![
1114 (expr, "c".to_string()),
1117 ],
1118 exec,
1119 )
1120 .unwrap();
1122 }
1123
1124 #[test]
1125 fn test_projection_statistics_uses_input_schema() {
1126 let input_schema = Schema::new(vec![
1127 Field::new("a", DataType::Int32, false),
1128 Field::new("b", DataType::Int32, false),
1129 Field::new("c", DataType::Int32, false),
1130 Field::new("d", DataType::Int32, false),
1131 Field::new("e", DataType::Int32, false),
1132 Field::new("f", DataType::Int32, false),
1133 ]);
1134
1135 let input_statistics = Statistics {
1136 num_rows: Precision::Exact(10),
1137 column_statistics: vec![
1138 ColumnStatistics {
1139 min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1140 max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1141 ..Default::default()
1142 },
1143 ColumnStatistics {
1144 min_value: Precision::Exact(ScalarValue::Int32(Some(5))),
1145 max_value: Precision::Exact(ScalarValue::Int32(Some(50))),
1146 ..Default::default()
1147 },
1148 ColumnStatistics {
1149 min_value: Precision::Exact(ScalarValue::Int32(Some(10))),
1150 max_value: Precision::Exact(ScalarValue::Int32(Some(40))),
1151 ..Default::default()
1152 },
1153 ColumnStatistics {
1154 min_value: Precision::Exact(ScalarValue::Int32(Some(20))),
1155 max_value: Precision::Exact(ScalarValue::Int32(Some(30))),
1156 ..Default::default()
1157 },
1158 ColumnStatistics {
1159 min_value: Precision::Exact(ScalarValue::Int32(Some(21))),
1160 max_value: Precision::Exact(ScalarValue::Int32(Some(29))),
1161 ..Default::default()
1162 },
1163 ColumnStatistics {
1164 min_value: Precision::Exact(ScalarValue::Int32(Some(24))),
1165 max_value: Precision::Exact(ScalarValue::Int32(Some(26))),
1166 ..Default::default()
1167 },
1168 ],
1169 ..Default::default()
1170 };
1171
1172 let input = Arc::new(StatisticsExec::new(input_statistics, input_schema));
1173
1174 let exprs: Vec<ProjectionExpr> = vec![
1179 ProjectionExpr {
1180 expr: Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>,
1181 alias: "c_renamed".to_string(),
1182 },
1183 ProjectionExpr {
1184 expr: Arc::new(BinaryExpr::new(
1185 Arc::new(Column::new("e", 4)),
1186 Operator::Plus,
1187 Arc::new(Column::new("f", 5)),
1188 )) as Arc<dyn PhysicalExpr>,
1189 alias: "e_plus_f".to_string(),
1190 },
1191 ];
1192
1193 let projection = ProjectionExec::try_new(exprs, input).unwrap();
1194
1195 let stats = projection.partition_statistics(None).unwrap();
1196
1197 assert_eq!(stats.num_rows, Precision::Exact(10));
1198 assert_eq!(
1199 stats.column_statistics.len(),
1200 2,
1201 "Expected 2 columns in projection statistics"
1202 );
1203 assert!(stats.total_byte_size.is_exact().unwrap_or(false));
1204 }
1205}