1use std::fmt;
19use std::mem::size_of;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::{Arc, OnceLock};
22use std::{any::Any, vec};
23
24use crate::execution_plan::{boundedness_from_children, EmissionType};
25use crate::filter_pushdown::{
26 ChildPushdownResult, FilterDescription, FilterPushdownPhase,
27 FilterPushdownPropagation,
28};
29use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator};
30use crate::joins::hash_join::stream::{
31 BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
32};
33use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
34use crate::joins::utils::{
35 asymmetric_join_output_partitioning, reorder_output_after_swap, swap_join_projection,
36 update_hash, OnceAsync, OnceFut,
37};
38use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder};
39use crate::projection::{
40 try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData,
41 ProjectionExec,
42};
43use crate::spill::get_record_batch_memory_size;
44use crate::ExecutionPlanProperties;
45use crate::{
46 common::can_project,
47 joins::utils::{
48 build_join_schema, check_join_is_valid, estimate_join_statistics,
49 need_produce_result_in_final, symmetric_join_output_partitioning,
50 BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
51 },
52 metrics::{ExecutionPlanMetricsSet, MetricsSet},
53 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
54 PlanProperties, SendableRecordBatchStream, Statistics,
55};
56
57use arrow::array::{ArrayRef, BooleanBufferBuilder};
58use arrow::compute::concat_batches;
59use arrow::datatypes::SchemaRef;
60use arrow::record_batch::RecordBatch;
61use arrow::util::bit_util;
62use arrow_schema::DataType;
63use datafusion_common::config::ConfigOptions;
64use datafusion_common::utils::memory::estimate_memory_size;
65use datafusion_common::{
66 internal_err, plan_err, project_schema, JoinSide, JoinType, NullEquality, Result,
67};
68use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
69use datafusion_execution::TaskContext;
70use datafusion_expr::Accumulator;
71use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
72use datafusion_physical_expr::equivalence::{
73 join_equivalence_properties, ProjectionMapping,
74};
75use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr};
76use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
77
78use ahash::RandomState;
79use datafusion_physical_expr_common::physical_expr::fmt_sql;
80use futures::TryStreamExt;
81use parking_lot::Mutex;
82
83const HASH_JOIN_SEED: RandomState =
85 RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
86
87pub(super) struct JoinLeftData {
89 pub(super) hash_map: Box<dyn JoinHashMapType>,
91 batch: RecordBatch,
93 values: Vec<ArrayRef>,
95 visited_indices_bitmap: SharedBitmapBuilder,
97 probe_threads_counter: AtomicUsize,
100 _reservation: MemoryReservation,
105 pub(super) bounds: Option<Vec<ColumnBounds>>,
107}
108
109impl JoinLeftData {
110 pub(super) fn new(
112 hash_map: Box<dyn JoinHashMapType>,
113 batch: RecordBatch,
114 values: Vec<ArrayRef>,
115 visited_indices_bitmap: SharedBitmapBuilder,
116 probe_threads_counter: AtomicUsize,
117 reservation: MemoryReservation,
118 bounds: Option<Vec<ColumnBounds>>,
119 ) -> Self {
120 Self {
121 hash_map,
122 batch,
123 values,
124 visited_indices_bitmap,
125 probe_threads_counter,
126 _reservation: reservation,
127 bounds,
128 }
129 }
130
131 pub(super) fn hash_map(&self) -> &dyn JoinHashMapType {
133 &*self.hash_map
134 }
135
136 pub(super) fn batch(&self) -> &RecordBatch {
138 &self.batch
139 }
140
141 pub(super) fn values(&self) -> &[ArrayRef] {
143 &self.values
144 }
145
146 pub(super) fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder {
148 &self.visited_indices_bitmap
149 }
150
151 pub(super) fn report_probe_completed(&self) -> bool {
154 self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
155 }
156}
157
158#[allow(rustdoc::private_intra_doc_links)]
159pub struct HashJoinExec {
321 pub left: Arc<dyn ExecutionPlan>,
323 pub right: Arc<dyn ExecutionPlan>,
325 pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
327 pub filter: Option<JoinFilter>,
329 pub join_type: JoinType,
331 join_schema: SchemaRef,
334 left_fut: Arc<OnceAsync<JoinLeftData>>,
341 random_state: RandomState,
343 pub mode: PartitionMode,
345 metrics: ExecutionPlanMetricsSet,
347 pub projection: Option<Vec<usize>>,
349 column_indices: Vec<ColumnIndex>,
351 pub null_equality: NullEquality,
353 cache: PlanProperties,
355 dynamic_filter: Option<HashJoinExecDynamicFilter>,
359}
360
361#[derive(Clone)]
362struct HashJoinExecDynamicFilter {
363 filter: Arc<DynamicFilterPhysicalExpr>,
365 bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
368}
369
370impl fmt::Debug for HashJoinExec {
371 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
372 f.debug_struct("HashJoinExec")
373 .field("left", &self.left)
374 .field("right", &self.right)
375 .field("on", &self.on)
376 .field("filter", &self.filter)
377 .field("join_type", &self.join_type)
378 .field("join_schema", &self.join_schema)
379 .field("left_fut", &self.left_fut)
380 .field("random_state", &self.random_state)
381 .field("mode", &self.mode)
382 .field("metrics", &self.metrics)
383 .field("projection", &self.projection)
384 .field("column_indices", &self.column_indices)
385 .field("null_equality", &self.null_equality)
386 .field("cache", &self.cache)
387 .finish()
389 }
390}
391
392impl EmbeddedProjection for HashJoinExec {
393 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
394 self.with_projection(projection)
395 }
396}
397
398impl HashJoinExec {
399 #[allow(clippy::too_many_arguments)]
404 pub fn try_new(
405 left: Arc<dyn ExecutionPlan>,
406 right: Arc<dyn ExecutionPlan>,
407 on: JoinOn,
408 filter: Option<JoinFilter>,
409 join_type: &JoinType,
410 projection: Option<Vec<usize>>,
411 partition_mode: PartitionMode,
412 null_equality: NullEquality,
413 ) -> Result<Self> {
414 let left_schema = left.schema();
415 let right_schema = right.schema();
416 if on.is_empty() {
417 return plan_err!("On constraints in HashJoinExec should be non-empty");
418 }
419
420 check_join_is_valid(&left_schema, &right_schema, &on)?;
421
422 let (join_schema, column_indices) =
423 build_join_schema(&left_schema, &right_schema, join_type);
424
425 let random_state = HASH_JOIN_SEED;
426
427 let join_schema = Arc::new(join_schema);
428
429 can_project(&join_schema, projection.as_ref())?;
431
432 let cache = Self::compute_properties(
433 &left,
434 &right,
435 Arc::clone(&join_schema),
436 *join_type,
437 &on,
438 partition_mode,
439 projection.as_ref(),
440 )?;
441
442 Ok(HashJoinExec {
446 left,
447 right,
448 on,
449 filter,
450 join_type: *join_type,
451 join_schema,
452 left_fut: Default::default(),
453 random_state,
454 mode: partition_mode,
455 metrics: ExecutionPlanMetricsSet::new(),
456 projection,
457 column_indices,
458 null_equality,
459 cache,
460 dynamic_filter: None,
461 })
462 }
463
464 fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
465 let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
468 Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
470 }
471
472 pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
474 &self.left
475 }
476
477 pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
479 &self.right
480 }
481
482 pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
484 &self.on
485 }
486
487 pub fn filter(&self) -> Option<&JoinFilter> {
489 self.filter.as_ref()
490 }
491
492 pub fn join_type(&self) -> &JoinType {
494 &self.join_type
495 }
496
497 pub fn join_schema(&self) -> &SchemaRef {
500 &self.join_schema
501 }
502
503 pub fn partition_mode(&self) -> &PartitionMode {
505 &self.mode
506 }
507
508 pub fn null_equality(&self) -> NullEquality {
510 self.null_equality
511 }
512
513 fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
515 vec![
516 false,
517 matches!(
518 join_type,
519 JoinType::Inner
520 | JoinType::Right
521 | JoinType::RightAnti
522 | JoinType::RightSemi
523 | JoinType::RightMark
524 ),
525 ]
526 }
527
528 pub fn probe_side() -> JoinSide {
530 JoinSide::Right
532 }
533
534 pub fn contains_projection(&self) -> bool {
536 self.projection.is_some()
537 }
538
539 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
541 can_project(&self.schema(), projection.as_ref())?;
543 let projection = match projection {
544 Some(projection) => match &self.projection {
545 Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
546 None => Some(projection),
547 },
548 None => None,
549 };
550 Self::try_new(
551 Arc::clone(&self.left),
552 Arc::clone(&self.right),
553 self.on.clone(),
554 self.filter.clone(),
555 &self.join_type,
556 projection,
557 self.mode,
558 self.null_equality,
559 )
560 }
561
562 fn compute_properties(
564 left: &Arc<dyn ExecutionPlan>,
565 right: &Arc<dyn ExecutionPlan>,
566 schema: SchemaRef,
567 join_type: JoinType,
568 on: JoinOnRef,
569 mode: PartitionMode,
570 projection: Option<&Vec<usize>>,
571 ) -> Result<PlanProperties> {
572 let mut eq_properties = join_equivalence_properties(
574 left.equivalence_properties().clone(),
575 right.equivalence_properties().clone(),
576 &join_type,
577 Arc::clone(&schema),
578 &Self::maintains_input_order(join_type),
579 Some(Self::probe_side()),
580 on,
581 )?;
582
583 let mut output_partitioning = match mode {
584 PartitionMode::CollectLeft => {
585 asymmetric_join_output_partitioning(left, right, &join_type)?
586 }
587 PartitionMode::Auto => Partitioning::UnknownPartitioning(
588 right.output_partitioning().partition_count(),
589 ),
590 PartitionMode::Partitioned => {
591 symmetric_join_output_partitioning(left, right, &join_type)?
592 }
593 };
594
595 let emission_type = if left.boundedness().is_unbounded() {
596 EmissionType::Final
597 } else if right.pipeline_behavior() == EmissionType::Incremental {
598 match join_type {
599 JoinType::Inner
602 | JoinType::LeftSemi
603 | JoinType::RightSemi
604 | JoinType::Right
605 | JoinType::RightAnti
606 | JoinType::RightMark => EmissionType::Incremental,
607 JoinType::Left
610 | JoinType::LeftAnti
611 | JoinType::LeftMark
612 | JoinType::Full => EmissionType::Both,
613 }
614 } else {
615 right.pipeline_behavior()
616 };
617
618 if let Some(projection) = projection {
620 let projection_mapping =
622 ProjectionMapping::from_indices(projection, &schema)?;
623 let out_schema = project_schema(&schema, Some(projection))?;
624 output_partitioning =
625 output_partitioning.project(&projection_mapping, &eq_properties);
626 eq_properties = eq_properties.project(&projection_mapping, out_schema);
627 }
628
629 Ok(PlanProperties::new(
630 eq_properties,
631 output_partitioning,
632 emission_type,
633 boundedness_from_children([left, right]),
634 ))
635 }
636
637 pub fn swap_inputs(
661 &self,
662 partition_mode: PartitionMode,
663 ) -> Result<Arc<dyn ExecutionPlan>> {
664 let left = self.left();
665 let right = self.right();
666 let new_join = HashJoinExec::try_new(
667 Arc::clone(right),
668 Arc::clone(left),
669 self.on()
670 .iter()
671 .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
672 .collect(),
673 self.filter().map(JoinFilter::swap),
674 &self.join_type().swap(),
675 swap_join_projection(
676 left.schema().fields().len(),
677 right.schema().fields().len(),
678 self.projection.as_ref(),
679 self.join_type(),
680 ),
681 partition_mode,
682 self.null_equality(),
683 )?;
684 if matches!(
686 self.join_type(),
687 JoinType::LeftSemi
688 | JoinType::RightSemi
689 | JoinType::LeftAnti
690 | JoinType::RightAnti
691 | JoinType::LeftMark
692 | JoinType::RightMark
693 ) || self.projection.is_some()
694 {
695 Ok(Arc::new(new_join))
696 } else {
697 reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
698 }
699 }
700}
701
702impl DisplayAs for HashJoinExec {
703 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
704 match t {
705 DisplayFormatType::Default | DisplayFormatType::Verbose => {
706 let display_filter = self.filter.as_ref().map_or_else(
707 || "".to_string(),
708 |f| format!(", filter={}", f.expression()),
709 );
710 let display_projections = if self.contains_projection() {
711 format!(
712 ", projection=[{}]",
713 self.projection
714 .as_ref()
715 .unwrap()
716 .iter()
717 .map(|index| format!(
718 "{}@{}",
719 self.join_schema.fields().get(*index).unwrap().name(),
720 index
721 ))
722 .collect::<Vec<_>>()
723 .join(", ")
724 )
725 } else {
726 "".to_string()
727 };
728 let display_null_equality =
729 if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
730 ", NullsEqual: true"
731 } else {
732 ""
733 };
734 let on = self
735 .on
736 .iter()
737 .map(|(c1, c2)| format!("({c1}, {c2})"))
738 .collect::<Vec<String>>()
739 .join(", ");
740 write!(
741 f,
742 "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}",
743 self.mode,
744 self.join_type,
745 on,
746 display_filter,
747 display_projections,
748 display_null_equality,
749 )
750 }
751 DisplayFormatType::TreeRender => {
752 let on = self
753 .on
754 .iter()
755 .map(|(c1, c2)| {
756 format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref()))
757 })
758 .collect::<Vec<String>>()
759 .join(", ");
760
761 if *self.join_type() != JoinType::Inner {
762 writeln!(f, "join_type={:?}", self.join_type)?;
763 }
764
765 writeln!(f, "on={on}")?;
766
767 if matches!(self.null_equality(), NullEquality::NullEqualsNull) {
768 writeln!(f, "NullsEqual: true")?;
769 }
770
771 if let Some(filter) = self.filter.as_ref() {
772 writeln!(f, "filter={filter}")?;
773 }
774
775 Ok(())
776 }
777 }
778 }
779}
780
781impl ExecutionPlan for HashJoinExec {
782 fn name(&self) -> &'static str {
783 "HashJoinExec"
784 }
785
786 fn as_any(&self) -> &dyn Any {
787 self
788 }
789
790 fn properties(&self) -> &PlanProperties {
791 &self.cache
792 }
793
794 fn required_input_distribution(&self) -> Vec<Distribution> {
795 match self.mode {
796 PartitionMode::CollectLeft => vec![
797 Distribution::SinglePartition,
798 Distribution::UnspecifiedDistribution,
799 ],
800 PartitionMode::Partitioned => {
801 let (left_expr, right_expr) = self
802 .on
803 .iter()
804 .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
805 .unzip();
806 vec![
807 Distribution::HashPartitioned(left_expr),
808 Distribution::HashPartitioned(right_expr),
809 ]
810 }
811 PartitionMode::Auto => vec![
812 Distribution::UnspecifiedDistribution,
813 Distribution::UnspecifiedDistribution,
814 ],
815 }
816 }
817
818 fn maintains_input_order(&self) -> Vec<bool> {
835 Self::maintains_input_order(self.join_type)
836 }
837
838 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
839 vec![&self.left, &self.right]
840 }
841
842 fn with_new_children(
848 self: Arc<Self>,
849 children: Vec<Arc<dyn ExecutionPlan>>,
850 ) -> Result<Arc<dyn ExecutionPlan>> {
851 Ok(Arc::new(HashJoinExec {
852 left: Arc::clone(&children[0]),
853 right: Arc::clone(&children[1]),
854 on: self.on.clone(),
855 filter: self.filter.clone(),
856 join_type: self.join_type,
857 join_schema: Arc::clone(&self.join_schema),
858 left_fut: Arc::clone(&self.left_fut),
859 random_state: self.random_state.clone(),
860 mode: self.mode,
861 metrics: ExecutionPlanMetricsSet::new(),
862 projection: self.projection.clone(),
863 column_indices: self.column_indices.clone(),
864 null_equality: self.null_equality,
865 cache: Self::compute_properties(
866 &children[0],
867 &children[1],
868 Arc::clone(&self.join_schema),
869 self.join_type,
870 &self.on,
871 self.mode,
872 self.projection.as_ref(),
873 )?,
874 dynamic_filter: self.dynamic_filter.clone(),
876 }))
877 }
878
879 fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
880 Ok(Arc::new(HashJoinExec {
881 left: Arc::clone(&self.left),
882 right: Arc::clone(&self.right),
883 on: self.on.clone(),
884 filter: self.filter.clone(),
885 join_type: self.join_type,
886 join_schema: Arc::clone(&self.join_schema),
887 left_fut: Arc::new(OnceAsync::default()),
889 random_state: self.random_state.clone(),
890 mode: self.mode,
891 metrics: ExecutionPlanMetricsSet::new(),
892 projection: self.projection.clone(),
893 column_indices: self.column_indices.clone(),
894 null_equality: self.null_equality,
895 cache: self.cache.clone(),
896 dynamic_filter: None,
898 }))
899 }
900
901 fn execute(
902 &self,
903 partition: usize,
904 context: Arc<TaskContext>,
905 ) -> Result<SendableRecordBatchStream> {
906 let on_left = self
907 .on
908 .iter()
909 .map(|on| Arc::clone(&on.0))
910 .collect::<Vec<_>>();
911 let left_partitions = self.left.output_partitioning().partition_count();
912 let right_partitions = self.right.output_partitioning().partition_count();
913
914 if self.mode == PartitionMode::Partitioned && left_partitions != right_partitions
915 {
916 return internal_err!(
917 "Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\
918 consider using RepartitionExec"
919 );
920 }
921
922 if self.mode == PartitionMode::CollectLeft && left_partitions != 1 {
923 return internal_err!(
924 "Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
925 consider using CoalescePartitionsExec or the EnforceDistribution rule"
926 );
927 }
928
929 let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
930
931 let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
932 let left_fut = match self.mode {
933 PartitionMode::CollectLeft => self.left_fut.try_once(|| {
934 let left_stream = self.left.execute(0, Arc::clone(&context))?;
935
936 let reservation =
937 MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
938
939 Ok(collect_left_input(
940 self.random_state.clone(),
941 left_stream,
942 on_left.clone(),
943 join_metrics.clone(),
944 reservation,
945 need_produce_result_in_final(self.join_type),
946 self.right().output_partitioning().partition_count(),
947 enable_dynamic_filter_pushdown,
948 ))
949 })?,
950 PartitionMode::Partitioned => {
951 let left_stream = self.left.execute(partition, Arc::clone(&context))?;
952
953 let reservation =
954 MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
955 .register(context.memory_pool());
956
957 OnceFut::new(collect_left_input(
958 self.random_state.clone(),
959 left_stream,
960 on_left.clone(),
961 join_metrics.clone(),
962 reservation,
963 need_produce_result_in_final(self.join_type),
964 1,
965 enable_dynamic_filter_pushdown,
966 ))
967 }
968 PartitionMode::Auto => {
969 return plan_err!(
970 "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()",
971 PartitionMode::Auto
972 );
973 }
974 };
975
976 let batch_size = context.session_config().batch_size();
977
978 let bounds_accumulator = enable_dynamic_filter_pushdown
980 .then(|| {
981 self.dynamic_filter.as_ref().map(|df| {
982 let filter = Arc::clone(&df.filter);
983 let on_right = self
984 .on
985 .iter()
986 .map(|(_, right_expr)| Arc::clone(right_expr))
987 .collect::<Vec<_>>();
988 Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
989 Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
990 self.mode,
991 self.left.as_ref(),
992 self.right.as_ref(),
993 filter,
994 on_right,
995 ))
996 })))
997 })
998 })
999 .flatten()
1000 .flatten();
1001
1002 let right_stream = self.right.execute(partition, context)?;
1005
1006 let column_indices_after_projection = match &self.projection {
1008 Some(projection) => projection
1009 .iter()
1010 .map(|i| self.column_indices[*i].clone())
1011 .collect(),
1012 None => self.column_indices.clone(),
1013 };
1014
1015 let on_right = self
1016 .on
1017 .iter()
1018 .map(|(_, right_expr)| Arc::clone(right_expr))
1019 .collect::<Vec<_>>();
1020
1021 Ok(Box::pin(HashJoinStream::new(
1022 partition,
1023 self.schema(),
1024 on_right,
1025 self.filter.clone(),
1026 self.join_type,
1027 right_stream,
1028 self.random_state.clone(),
1029 join_metrics,
1030 column_indices_after_projection,
1031 self.null_equality,
1032 HashJoinStreamState::WaitBuildSide,
1033 BuildSide::Initial(BuildSideInitialState { left_fut }),
1034 batch_size,
1035 vec![],
1036 self.right.output_ordering().is_some(),
1037 bounds_accumulator,
1038 self.mode,
1039 )))
1040 }
1041
1042 fn metrics(&self) -> Option<MetricsSet> {
1043 Some(self.metrics.clone_inner())
1044 }
1045
1046 fn statistics(&self) -> Result<Statistics> {
1047 self.partition_statistics(None)
1048 }
1049
1050 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1051 if partition.is_some() {
1052 return Ok(Statistics::new_unknown(&self.schema()));
1053 }
1054 let stats = estimate_join_statistics(
1058 self.left.partition_statistics(None)?,
1059 self.right.partition_statistics(None)?,
1060 self.on.clone(),
1061 &self.join_type,
1062 &self.join_schema,
1063 )?;
1064 Ok(stats.project(self.projection.as_ref()))
1066 }
1067
1068 fn try_swapping_with_projection(
1072 &self,
1073 projection: &ProjectionExec,
1074 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1075 if self.contains_projection() {
1077 return Ok(None);
1078 }
1079
1080 if let Some(JoinData {
1081 projected_left_child,
1082 projected_right_child,
1083 join_filter,
1084 join_on,
1085 }) = try_pushdown_through_join(
1086 projection,
1087 self.left(),
1088 self.right(),
1089 self.on(),
1090 self.schema(),
1091 self.filter(),
1092 )? {
1093 Ok(Some(Arc::new(HashJoinExec::try_new(
1094 Arc::new(projected_left_child),
1095 Arc::new(projected_right_child),
1096 join_on,
1097 join_filter,
1098 self.join_type(),
1099 None,
1101 *self.partition_mode(),
1102 self.null_equality,
1103 )?)))
1104 } else {
1105 try_embed_projection(projection, self)
1106 }
1107 }
1108
1109 fn gather_filters_for_pushdown(
1110 &self,
1111 phase: FilterPushdownPhase,
1112 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
1113 config: &ConfigOptions,
1114 ) -> Result<FilterDescription> {
1115 if self.join_type != JoinType::Inner {
1120 return Ok(FilterDescription::all_unsupported(
1121 &parent_filters,
1122 &self.children(),
1123 ));
1124 }
1125
1126 let left_child = crate::filter_pushdown::ChildFilterDescription::from_child(
1128 &parent_filters,
1129 self.left(),
1130 )?;
1131 let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child(
1132 &parent_filters,
1133 self.right(),
1134 )?;
1135
1136 if matches!(phase, FilterPushdownPhase::Post)
1138 && config.optimizer.enable_join_dynamic_filter_pushdown
1139 {
1140 let dynamic_filter = Self::create_dynamic_filter(&self.on);
1142 right_child = right_child.with_self_filter(dynamic_filter);
1143 }
1144
1145 Ok(FilterDescription::new()
1146 .with_child(left_child)
1147 .with_child(right_child))
1148 }
1149
1150 fn handle_child_pushdown_result(
1151 &self,
1152 _phase: FilterPushdownPhase,
1153 child_pushdown_result: ChildPushdownResult,
1154 _config: &ConfigOptions,
1155 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
1156 if self.join_type != JoinType::Inner {
1161 return Ok(FilterPushdownPropagation::all_unsupported(
1165 child_pushdown_result,
1166 ));
1167 }
1168
1169 let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
1170 assert_eq!(child_pushdown_result.self_filters.len(), 2); let right_child_self_filters = &child_pushdown_result.self_filters[1]; if let Some(filter) = right_child_self_filters.first() {
1174 let predicate = Arc::clone(&filter.predicate);
1177 if let Ok(dynamic_filter) =
1178 Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
1179 {
1180 let new_node = Arc::new(HashJoinExec {
1182 left: Arc::clone(&self.left),
1183 right: Arc::clone(&self.right),
1184 on: self.on.clone(),
1185 filter: self.filter.clone(),
1186 join_type: self.join_type,
1187 join_schema: Arc::clone(&self.join_schema),
1188 left_fut: Arc::clone(&self.left_fut),
1189 random_state: self.random_state.clone(),
1190 mode: self.mode,
1191 metrics: ExecutionPlanMetricsSet::new(),
1192 projection: self.projection.clone(),
1193 column_indices: self.column_indices.clone(),
1194 null_equality: self.null_equality,
1195 cache: self.cache.clone(),
1196 dynamic_filter: Some(HashJoinExecDynamicFilter {
1197 filter: dynamic_filter,
1198 bounds_accumulator: OnceLock::new(),
1199 }),
1200 });
1201 result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
1202 }
1203 }
1204 Ok(result)
1205 }
1206}
1207
1208struct CollectLeftAccumulator {
1218 expr: Arc<dyn PhysicalExpr>,
1220 min: MinAccumulator,
1222 max: MaxAccumulator,
1224}
1225
1226impl CollectLeftAccumulator {
1227 fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> Result<Self> {
1236 fn dictionary_value_type(data_type: &DataType) -> DataType {
1238 match data_type {
1239 DataType::Dictionary(_, value_type) => {
1240 dictionary_value_type(value_type.as_ref())
1241 }
1242 _ => data_type.clone(),
1243 }
1244 }
1245
1246 let data_type = expr
1247 .data_type(schema)
1248 .map(|dt| dictionary_value_type(&dt))?;
1250 Ok(Self {
1251 expr,
1252 min: MinAccumulator::try_new(&data_type)?,
1253 max: MaxAccumulator::try_new(&data_type)?,
1254 })
1255 }
1256
1257 fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> {
1268 let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
1269 self.min.update_batch(std::slice::from_ref(&array))?;
1270 self.max.update_batch(std::slice::from_ref(&array))?;
1271 Ok(())
1272 }
1273
1274 fn evaluate(mut self) -> Result<ColumnBounds> {
1281 Ok(ColumnBounds::new(
1282 self.min.evaluate()?,
1283 self.max.evaluate()?,
1284 ))
1285 }
1286}
1287
1288struct BuildSideState {
1290 batches: Vec<RecordBatch>,
1291 num_rows: usize,
1292 metrics: BuildProbeJoinMetrics,
1293 reservation: MemoryReservation,
1294 bounds_accumulators: Option<Vec<CollectLeftAccumulator>>,
1295}
1296
1297impl BuildSideState {
1298 fn try_new(
1300 metrics: BuildProbeJoinMetrics,
1301 reservation: MemoryReservation,
1302 on_left: Vec<Arc<dyn PhysicalExpr>>,
1303 schema: &SchemaRef,
1304 should_compute_bounds: bool,
1305 ) -> Result<Self> {
1306 Ok(Self {
1307 batches: Vec::new(),
1308 num_rows: 0,
1309 metrics,
1310 reservation,
1311 bounds_accumulators: should_compute_bounds
1312 .then(|| {
1313 on_left
1314 .iter()
1315 .map(|expr| {
1316 CollectLeftAccumulator::try_new(Arc::clone(expr), schema)
1317 })
1318 .collect::<Result<Vec<_>>>()
1319 })
1320 .transpose()?,
1321 })
1322 }
1323}
1324
1325#[allow(clippy::too_many_arguments)]
1354async fn collect_left_input(
1355 random_state: RandomState,
1356 left_stream: SendableRecordBatchStream,
1357 on_left: Vec<PhysicalExprRef>,
1358 metrics: BuildProbeJoinMetrics,
1359 reservation: MemoryReservation,
1360 with_visited_indices_bitmap: bool,
1361 probe_threads_count: usize,
1362 should_compute_bounds: bool,
1363) -> Result<JoinLeftData> {
1364 let schema = left_stream.schema();
1365
1366 let initial = BuildSideState::try_new(
1370 metrics,
1371 reservation,
1372 on_left.clone(),
1373 &schema,
1374 should_compute_bounds,
1375 )?;
1376
1377 let state = left_stream
1378 .try_fold(initial, |mut state, batch| async move {
1379 if let Some(ref mut accumulators) = state.bounds_accumulators {
1381 for accumulator in accumulators {
1382 accumulator.update_batch(&batch)?;
1383 }
1384 }
1385
1386 let batch_size = get_record_batch_memory_size(&batch);
1388 state.reservation.try_grow(batch_size)?;
1390 state.metrics.build_mem_used.add(batch_size);
1392 state.metrics.build_input_batches.add(1);
1393 state.metrics.build_input_rows.add(batch.num_rows());
1394 state.num_rows += batch.num_rows();
1396 state.batches.push(batch);
1398 Ok(state)
1399 })
1400 .await?;
1401
1402 let BuildSideState {
1404 batches,
1405 num_rows,
1406 metrics,
1407 mut reservation,
1408 bounds_accumulators,
1409 } = state;
1410
1411 let fixed_size_u32 = size_of::<JoinHashMapU32>();
1414 let fixed_size_u64 = size_of::<JoinHashMapU64>();
1415
1416 let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
1419 let estimated_hashtable_size =
1420 estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
1421 reservation.try_grow(estimated_hashtable_size)?;
1422 metrics.build_mem_used.add(estimated_hashtable_size);
1423 Box::new(JoinHashMapU64::with_capacity(num_rows))
1424 } else {
1425 let estimated_hashtable_size =
1426 estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?;
1427 reservation.try_grow(estimated_hashtable_size)?;
1428 metrics.build_mem_used.add(estimated_hashtable_size);
1429 Box::new(JoinHashMapU32::with_capacity(num_rows))
1430 };
1431
1432 let mut hashes_buffer = Vec::new();
1433 let mut offset = 0;
1434
1435 let batches_iter = batches.iter().rev();
1437 for batch in batches_iter.clone() {
1438 hashes_buffer.clear();
1439 hashes_buffer.resize(batch.num_rows(), 0);
1440 update_hash(
1441 &on_left,
1442 batch,
1443 &mut *hashmap,
1444 offset,
1445 &random_state,
1446 &mut hashes_buffer,
1447 0,
1448 true,
1449 )?;
1450 offset += batch.num_rows();
1451 }
1452 let single_batch = concat_batches(&schema, batches_iter)?;
1454
1455 let visited_indices_bitmap = if with_visited_indices_bitmap {
1457 let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
1458 reservation.try_grow(bitmap_size)?;
1459 metrics.build_mem_used.add(bitmap_size);
1460
1461 let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
1462 bitmap_buffer.append_n(num_rows, false);
1463 bitmap_buffer
1464 } else {
1465 BooleanBufferBuilder::new(0)
1466 };
1467
1468 let left_values = on_left
1469 .iter()
1470 .map(|c| {
1471 c.evaluate(&single_batch)?
1472 .into_array(single_batch.num_rows())
1473 })
1474 .collect::<Result<Vec<_>>>()?;
1475
1476 let bounds = match bounds_accumulators {
1478 Some(accumulators) if num_rows > 0 => {
1479 let bounds = accumulators
1480 .into_iter()
1481 .map(CollectLeftAccumulator::evaluate)
1482 .collect::<Result<Vec<_>>>()?;
1483 Some(bounds)
1484 }
1485 _ => None,
1486 };
1487
1488 let data = JoinLeftData::new(
1489 hashmap,
1490 single_batch,
1491 left_values.clone(),
1492 Mutex::new(visited_indices_bitmap),
1493 AtomicUsize::new(probe_threads_count),
1494 reservation,
1495 bounds,
1496 );
1497
1498 Ok(data)
1499}
1500
1501#[cfg(test)]
1502mod tests {
1503 use super::*;
1504 use crate::coalesce_partitions::CoalescePartitionsExec;
1505 use crate::joins::hash_join::stream::lookup_join_hashmap;
1506 use crate::test::{assert_join_metrics, TestMemoryExec};
1507 use crate::{
1508 common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
1509 test::exec::MockExec,
1510 };
1511
1512 use arrow::array::{Date32Array, Int32Array, StructArray, UInt32Array, UInt64Array};
1513 use arrow::buffer::NullBuffer;
1514 use arrow::datatypes::{DataType, Field};
1515 use arrow_schema::Schema;
1516 use datafusion_common::hash_utils::create_hashes;
1517 use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
1518 use datafusion_common::{
1519 assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err,
1520 ScalarValue,
1521 };
1522 use datafusion_execution::config::SessionConfig;
1523 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1524 use datafusion_expr::Operator;
1525 use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
1526 use datafusion_physical_expr::PhysicalExpr;
1527 use hashbrown::HashTable;
1528 use insta::{allow_duplicates, assert_snapshot};
1529 use rstest::*;
1530 use rstest_reuse::*;
1531
1532 fn div_ceil(a: usize, b: usize) -> usize {
1533 a.div_ceil(b)
1534 }
1535
1536 #[template]
1537 #[rstest]
1538 fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {}
1539
1540 fn prepare_task_ctx(batch_size: usize) -> Arc<TaskContext> {
1541 let session_config = SessionConfig::default().with_batch_size(batch_size);
1542 Arc::new(TaskContext::default().with_session_config(session_config))
1543 }
1544
1545 fn build_table(
1546 a: (&str, &Vec<i32>),
1547 b: (&str, &Vec<i32>),
1548 c: (&str, &Vec<i32>),
1549 ) -> Arc<dyn ExecutionPlan> {
1550 let batch = build_table_i32(a, b, c);
1551 let schema = batch.schema();
1552 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
1553 }
1554
1555 fn join(
1556 left: Arc<dyn ExecutionPlan>,
1557 right: Arc<dyn ExecutionPlan>,
1558 on: JoinOn,
1559 join_type: &JoinType,
1560 null_equality: NullEquality,
1561 ) -> Result<HashJoinExec> {
1562 HashJoinExec::try_new(
1563 left,
1564 right,
1565 on,
1566 None,
1567 join_type,
1568 None,
1569 PartitionMode::CollectLeft,
1570 null_equality,
1571 )
1572 }
1573
1574 fn join_with_filter(
1575 left: Arc<dyn ExecutionPlan>,
1576 right: Arc<dyn ExecutionPlan>,
1577 on: JoinOn,
1578 filter: JoinFilter,
1579 join_type: &JoinType,
1580 null_equality: NullEquality,
1581 ) -> Result<HashJoinExec> {
1582 HashJoinExec::try_new(
1583 left,
1584 right,
1585 on,
1586 Some(filter),
1587 join_type,
1588 None,
1589 PartitionMode::CollectLeft,
1590 null_equality,
1591 )
1592 }
1593
1594 async fn join_collect(
1595 left: Arc<dyn ExecutionPlan>,
1596 right: Arc<dyn ExecutionPlan>,
1597 on: JoinOn,
1598 join_type: &JoinType,
1599 null_equality: NullEquality,
1600 context: Arc<TaskContext>,
1601 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1602 let join = join(left, right, on, join_type, null_equality)?;
1603 let columns_header = columns(&join.schema());
1604
1605 let stream = join.execute(0, context)?;
1606 let batches = common::collect(stream).await?;
1607 let metrics = join.metrics().unwrap();
1608
1609 Ok((columns_header, batches, metrics))
1610 }
1611
1612 async fn partitioned_join_collect(
1613 left: Arc<dyn ExecutionPlan>,
1614 right: Arc<dyn ExecutionPlan>,
1615 on: JoinOn,
1616 join_type: &JoinType,
1617 null_equality: NullEquality,
1618 context: Arc<TaskContext>,
1619 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1620 join_collect_with_partition_mode(
1621 left,
1622 right,
1623 on,
1624 join_type,
1625 PartitionMode::Partitioned,
1626 null_equality,
1627 context,
1628 )
1629 .await
1630 }
1631
1632 async fn join_collect_with_partition_mode(
1633 left: Arc<dyn ExecutionPlan>,
1634 right: Arc<dyn ExecutionPlan>,
1635 on: JoinOn,
1636 join_type: &JoinType,
1637 partition_mode: PartitionMode,
1638 null_equality: NullEquality,
1639 context: Arc<TaskContext>,
1640 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
1641 let partition_count = 4;
1642
1643 let (left_expr, right_expr) = on
1644 .iter()
1645 .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
1646 .unzip();
1647
1648 let left_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
1649 PartitionMode::CollectLeft => Arc::new(CoalescePartitionsExec::new(left)),
1650 PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
1651 left,
1652 Partitioning::Hash(left_expr, partition_count),
1653 )?),
1654 PartitionMode::Auto => {
1655 return internal_err!("Unexpected PartitionMode::Auto in join tests")
1656 }
1657 };
1658
1659 let right_repartitioned: Arc<dyn ExecutionPlan> = match partition_mode {
1660 PartitionMode::CollectLeft => {
1661 let partition_column_name = right.schema().field(0).name().clone();
1662 let partition_expr = vec![Arc::new(Column::new_with_schema(
1663 &partition_column_name,
1664 &right.schema(),
1665 )?) as _];
1666 Arc::new(RepartitionExec::try_new(
1667 right,
1668 Partitioning::Hash(partition_expr, partition_count),
1669 )?) as _
1670 }
1671 PartitionMode::Partitioned => Arc::new(RepartitionExec::try_new(
1672 right,
1673 Partitioning::Hash(right_expr, partition_count),
1674 )?),
1675 PartitionMode::Auto => {
1676 return internal_err!("Unexpected PartitionMode::Auto in join tests")
1677 }
1678 };
1679
1680 let join = HashJoinExec::try_new(
1681 left_repartitioned,
1682 right_repartitioned,
1683 on,
1684 None,
1685 join_type,
1686 None,
1687 partition_mode,
1688 null_equality,
1689 )?;
1690
1691 let columns = columns(&join.schema());
1692
1693 let mut batches = vec![];
1694 for i in 0..partition_count {
1695 let stream = join.execute(i, Arc::clone(&context))?;
1696 let more_batches = common::collect(stream).await?;
1697 batches.extend(
1698 more_batches
1699 .into_iter()
1700 .filter(|b| b.num_rows() > 0)
1701 .collect::<Vec<_>>(),
1702 );
1703 }
1704 let metrics = join.metrics().unwrap();
1705
1706 Ok((columns, batches, metrics))
1707 }
1708
1709 #[apply(batch_sizes)]
1710 #[tokio::test]
1711 async fn join_inner_one(batch_size: usize) -> Result<()> {
1712 let task_ctx = prepare_task_ctx(batch_size);
1713 let left = build_table(
1714 ("a1", &vec![1, 2, 3]),
1715 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
1717 );
1718 let right = build_table(
1719 ("a2", &vec![10, 20, 30]),
1720 ("b1", &vec![4, 5, 6]),
1721 ("c2", &vec![70, 80, 90]),
1722 );
1723
1724 let on = vec![(
1725 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1726 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
1727 )];
1728
1729 let (columns, batches, metrics) = join_collect(
1730 Arc::clone(&left),
1731 Arc::clone(&right),
1732 on.clone(),
1733 &JoinType::Inner,
1734 NullEquality::NullEqualsNothing,
1735 task_ctx,
1736 )
1737 .await?;
1738
1739 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
1740
1741 allow_duplicates! {
1742 assert_snapshot!(batches_to_string(&batches), @r#"
1744 +----+----+----+----+----+----+
1745 | a1 | b1 | c1 | a2 | b1 | c2 |
1746 +----+----+----+----+----+----+
1747 | 1 | 4 | 7 | 10 | 4 | 70 |
1748 | 2 | 5 | 8 | 20 | 5 | 80 |
1749 | 3 | 5 | 9 | 20 | 5 | 80 |
1750 +----+----+----+----+----+----+
1751 "#);
1752 }
1753
1754 assert_join_metrics!(metrics, 3);
1755
1756 Ok(())
1757 }
1758
1759 #[apply(batch_sizes)]
1760 #[tokio::test]
1761 async fn partitioned_join_inner_one(batch_size: usize) -> Result<()> {
1762 let task_ctx = prepare_task_ctx(batch_size);
1763 let left = build_table(
1764 ("a1", &vec![1, 2, 3]),
1765 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
1767 );
1768 let right = build_table(
1769 ("a2", &vec![10, 20, 30]),
1770 ("b1", &vec![4, 5, 6]),
1771 ("c2", &vec![70, 80, 90]),
1772 );
1773 let on = vec![(
1774 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1775 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
1776 )];
1777
1778 let (columns, batches, metrics) = partitioned_join_collect(
1779 Arc::clone(&left),
1780 Arc::clone(&right),
1781 on.clone(),
1782 &JoinType::Inner,
1783 NullEquality::NullEqualsNothing,
1784 task_ctx,
1785 )
1786 .await?;
1787
1788 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
1789
1790 allow_duplicates! {
1791 assert_snapshot!(batches_to_sort_string(&batches), @r#"
1792 +----+----+----+----+----+----+
1793 | a1 | b1 | c1 | a2 | b1 | c2 |
1794 +----+----+----+----+----+----+
1795 | 1 | 4 | 7 | 10 | 4 | 70 |
1796 | 2 | 5 | 8 | 20 | 5 | 80 |
1797 | 3 | 5 | 9 | 20 | 5 | 80 |
1798 +----+----+----+----+----+----+
1799 "#);
1800 }
1801
1802 assert_join_metrics!(metrics, 3);
1803
1804 Ok(())
1805 }
1806
1807 #[tokio::test]
1808 async fn join_inner_one_no_shared_column_names() -> Result<()> {
1809 let task_ctx = Arc::new(TaskContext::default());
1810 let left = build_table(
1811 ("a1", &vec![1, 2, 3]),
1812 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
1814 );
1815 let right = build_table(
1816 ("a2", &vec![10, 20, 30]),
1817 ("b2", &vec![4, 5, 6]),
1818 ("c2", &vec![70, 80, 90]),
1819 );
1820 let on = vec![(
1821 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1822 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1823 )];
1824
1825 let (columns, batches, metrics) = join_collect(
1826 left,
1827 right,
1828 on,
1829 &JoinType::Inner,
1830 NullEquality::NullEqualsNothing,
1831 task_ctx,
1832 )
1833 .await?;
1834
1835 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1836
1837 allow_duplicates! {
1839 assert_snapshot!(batches_to_string(&batches), @r#"
1840 +----+----+----+----+----+----+
1841 | a1 | b1 | c1 | a2 | b2 | c2 |
1842 +----+----+----+----+----+----+
1843 | 1 | 4 | 7 | 10 | 4 | 70 |
1844 | 2 | 5 | 8 | 20 | 5 | 80 |
1845 | 3 | 5 | 9 | 20 | 5 | 80 |
1846 +----+----+----+----+----+----+
1847 "#);
1848 }
1849
1850 assert_join_metrics!(metrics, 3);
1851
1852 Ok(())
1853 }
1854
1855 #[tokio::test]
1856 async fn join_inner_one_randomly_ordered() -> Result<()> {
1857 let task_ctx = Arc::new(TaskContext::default());
1858 let left = build_table(
1859 ("a1", &vec![0, 3, 2, 1]),
1860 ("b1", &vec![4, 5, 5, 4]),
1861 ("c1", &vec![6, 9, 8, 7]),
1862 );
1863 let right = build_table(
1864 ("a2", &vec![20, 30, 10]),
1865 ("b2", &vec![5, 6, 4]),
1866 ("c2", &vec![80, 90, 70]),
1867 );
1868 let on = vec![(
1869 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
1870 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1871 )];
1872
1873 let (columns, batches, metrics) = join_collect(
1874 left,
1875 right,
1876 on,
1877 &JoinType::Inner,
1878 NullEquality::NullEqualsNothing,
1879 task_ctx,
1880 )
1881 .await?;
1882
1883 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
1884
1885 allow_duplicates! {
1887 assert_snapshot!(batches_to_string(&batches), @r#"
1888 +----+----+----+----+----+----+
1889 | a1 | b1 | c1 | a2 | b2 | c2 |
1890 +----+----+----+----+----+----+
1891 | 3 | 5 | 9 | 20 | 5 | 80 |
1892 | 2 | 5 | 8 | 20 | 5 | 80 |
1893 | 0 | 4 | 6 | 10 | 4 | 70 |
1894 | 1 | 4 | 7 | 10 | 4 | 70 |
1895 +----+----+----+----+----+----+
1896 "#);
1897 }
1898
1899 assert_join_metrics!(metrics, 4);
1900
1901 Ok(())
1902 }
1903
1904 #[apply(batch_sizes)]
1905 #[tokio::test]
1906 async fn join_inner_two(batch_size: usize) -> Result<()> {
1907 let task_ctx = prepare_task_ctx(batch_size);
1908 let left = build_table(
1909 ("a1", &vec![1, 2, 2]),
1910 ("b2", &vec![1, 2, 2]),
1911 ("c1", &vec![7, 8, 9]),
1912 );
1913 let right = build_table(
1914 ("a1", &vec![1, 2, 3]),
1915 ("b2", &vec![1, 2, 2]),
1916 ("c2", &vec![70, 80, 90]),
1917 );
1918 let on = vec![
1919 (
1920 Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
1921 Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
1922 ),
1923 (
1924 Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
1925 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
1926 ),
1927 ];
1928
1929 let (columns, batches, metrics) = join_collect(
1930 left,
1931 right,
1932 on,
1933 &JoinType::Inner,
1934 NullEquality::NullEqualsNothing,
1935 task_ctx,
1936 )
1937 .await?;
1938
1939 assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
1940
1941 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
1942 let mut expected_batch_count = div_ceil(3, batch_size);
1945 if batch_size == 1 {
1946 expected_batch_count += 1;
1947 }
1948 expected_batch_count
1949 } else {
1950 div_ceil(9, batch_size)
1953 };
1954
1955 assert_eq!(batches.len(), expected_batch_count);
1956
1957 allow_duplicates! {
1959 assert_snapshot!(batches_to_string(&batches), @r#"
1960 +----+----+----+----+----+----+
1961 | a1 | b2 | c1 | a1 | b2 | c2 |
1962 +----+----+----+----+----+----+
1963 | 1 | 1 | 7 | 1 | 1 | 70 |
1964 | 2 | 2 | 8 | 2 | 2 | 80 |
1965 | 2 | 2 | 9 | 2 | 2 | 80 |
1966 +----+----+----+----+----+----+
1967 "#);
1968 }
1969
1970 assert_join_metrics!(metrics, 3);
1971
1972 Ok(())
1973 }
1974
1975 #[apply(batch_sizes)]
1977 #[tokio::test]
1978 async fn join_inner_one_two_parts_left(batch_size: usize) -> Result<()> {
1979 let task_ctx = prepare_task_ctx(batch_size);
1980 let batch1 = build_table_i32(
1981 ("a1", &vec![1, 2]),
1982 ("b2", &vec![1, 2]),
1983 ("c1", &vec![7, 8]),
1984 );
1985 let batch2 =
1986 build_table_i32(("a1", &vec![2]), ("b2", &vec![2]), ("c1", &vec![9]));
1987 let schema = batch1.schema();
1988 let left =
1989 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
1990 .unwrap();
1991 let left = Arc::new(CoalescePartitionsExec::new(left));
1992
1993 let right = build_table(
1994 ("a1", &vec![1, 2, 3]),
1995 ("b2", &vec![1, 2, 2]),
1996 ("c2", &vec![70, 80, 90]),
1997 );
1998 let on = vec![
1999 (
2000 Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2001 Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2002 ),
2003 (
2004 Arc::new(Column::new_with_schema("b2", &left.schema())?) as _,
2005 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2006 ),
2007 ];
2008
2009 let (columns, batches, metrics) = join_collect(
2010 left,
2011 right,
2012 on,
2013 &JoinType::Inner,
2014 NullEquality::NullEqualsNothing,
2015 task_ctx,
2016 )
2017 .await?;
2018
2019 assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]);
2020
2021 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2022 let mut expected_batch_count = div_ceil(3, batch_size);
2025 if batch_size == 1 {
2026 expected_batch_count += 1;
2027 }
2028 expected_batch_count
2029 } else {
2030 div_ceil(9, batch_size)
2033 };
2034
2035 assert_eq!(batches.len(), expected_batch_count);
2036
2037 allow_duplicates! {
2039 assert_snapshot!(batches_to_string(&batches), @r#"
2040 +----+----+----+----+----+----+
2041 | a1 | b2 | c1 | a1 | b2 | c2 |
2042 +----+----+----+----+----+----+
2043 | 1 | 1 | 7 | 1 | 1 | 70 |
2044 | 2 | 2 | 8 | 2 | 2 | 80 |
2045 | 2 | 2 | 9 | 2 | 2 | 80 |
2046 +----+----+----+----+----+----+
2047 "#);
2048 }
2049
2050 assert_join_metrics!(metrics, 3);
2051
2052 Ok(())
2053 }
2054
2055 #[tokio::test]
2056 async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
2057 let task_ctx = Arc::new(TaskContext::default());
2058 let batch1 = build_table_i32(
2059 ("a1", &vec![0, 3]),
2060 ("b1", &vec![4, 5]),
2061 ("c1", &vec![6, 9]),
2062 );
2063 let batch2 = build_table_i32(
2064 ("a1", &vec![2, 1]),
2065 ("b1", &vec![5, 4]),
2066 ("c1", &vec![8, 7]),
2067 );
2068 let schema = batch1.schema();
2069
2070 let left =
2071 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2072 .unwrap();
2073 let left = Arc::new(CoalescePartitionsExec::new(left));
2074 let right = build_table(
2075 ("a2", &vec![20, 30, 10]),
2076 ("b2", &vec![5, 6, 4]),
2077 ("c2", &vec![80, 90, 70]),
2078 );
2079 let on = vec![(
2080 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2081 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2082 )];
2083
2084 let (columns, batches, metrics) = join_collect(
2085 left,
2086 right,
2087 on,
2088 &JoinType::Inner,
2089 NullEquality::NullEqualsNothing,
2090 task_ctx,
2091 )
2092 .await?;
2093
2094 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2095
2096 allow_duplicates! {
2098 assert_snapshot!(batches_to_string(&batches), @r#"
2099 +----+----+----+----+----+----+
2100 | a1 | b1 | c1 | a2 | b2 | c2 |
2101 +----+----+----+----+----+----+
2102 | 3 | 5 | 9 | 20 | 5 | 80 |
2103 | 2 | 5 | 8 | 20 | 5 | 80 |
2104 | 0 | 4 | 6 | 10 | 4 | 70 |
2105 | 1 | 4 | 7 | 10 | 4 | 70 |
2106 +----+----+----+----+----+----+
2107 "#);
2108 }
2109
2110 assert_join_metrics!(metrics, 4);
2111
2112 Ok(())
2113 }
2114
2115 #[apply(batch_sizes)]
2117 #[tokio::test]
2118 async fn join_inner_one_two_parts_right(batch_size: usize) -> Result<()> {
2119 let task_ctx = prepare_task_ctx(batch_size);
2120 let left = build_table(
2121 ("a1", &vec![1, 2, 3]),
2122 ("b1", &vec![4, 5, 5]), ("c1", &vec![7, 8, 9]),
2124 );
2125
2126 let batch1 = build_table_i32(
2127 ("a2", &vec![10, 20]),
2128 ("b1", &vec![4, 6]),
2129 ("c2", &vec![70, 80]),
2130 );
2131 let batch2 =
2132 build_table_i32(("a2", &vec![30]), ("b1", &vec![5]), ("c2", &vec![90]));
2133 let schema = batch1.schema();
2134 let right =
2135 TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
2136 .unwrap();
2137
2138 let on = vec![(
2139 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2140 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2141 )];
2142
2143 let join = join(
2144 left,
2145 right,
2146 on,
2147 &JoinType::Inner,
2148 NullEquality::NullEqualsNothing,
2149 )?;
2150
2151 let columns = columns(&join.schema());
2152 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2153
2154 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2156 let batches = common::collect(stream).await?;
2157
2158 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2159 let mut expected_batch_count = div_ceil(1, batch_size);
2162 if batch_size == 1 {
2163 expected_batch_count += 1;
2164 }
2165 expected_batch_count
2166 } else {
2167 div_ceil(6, batch_size)
2170 };
2171 assert_eq!(batches.len(), expected_batch_count);
2172
2173 allow_duplicates! {
2175 assert_snapshot!(batches_to_string(&batches), @r#"
2176 +----+----+----+----+----+----+
2177 | a1 | b1 | c1 | a2 | b1 | c2 |
2178 +----+----+----+----+----+----+
2179 | 1 | 4 | 7 | 10 | 4 | 70 |
2180 +----+----+----+----+----+----+
2181 "#);
2182 }
2183
2184 let stream = join.execute(1, Arc::clone(&task_ctx))?;
2186 let batches = common::collect(stream).await?;
2187
2188 let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) {
2189 div_ceil(2, batch_size)
2191 } else {
2192 div_ceil(3, batch_size)
2195 };
2196 assert_eq!(batches.len(), expected_batch_count);
2197
2198 allow_duplicates! {
2200 assert_snapshot!(batches_to_string(&batches), @r#"
2201 +----+----+----+----+----+----+
2202 | a1 | b1 | c1 | a2 | b1 | c2 |
2203 +----+----+----+----+----+----+
2204 | 2 | 5 | 8 | 30 | 5 | 90 |
2205 | 3 | 5 | 9 | 30 | 5 | 90 |
2206 +----+----+----+----+----+----+
2207 "#);
2208 }
2209
2210 Ok(())
2211 }
2212
2213 fn build_table_two_batches(
2214 a: (&str, &Vec<i32>),
2215 b: (&str, &Vec<i32>),
2216 c: (&str, &Vec<i32>),
2217 ) -> Arc<dyn ExecutionPlan> {
2218 let batch = build_table_i32(a, b, c);
2219 let schema = batch.schema();
2220 TestMemoryExec::try_new_exec(&[vec![batch.clone(), batch]], schema, None).unwrap()
2221 }
2222
2223 #[apply(batch_sizes)]
2224 #[tokio::test]
2225 async fn join_left_multi_batch(batch_size: usize) {
2226 let task_ctx = prepare_task_ctx(batch_size);
2227 let left = build_table(
2228 ("a1", &vec![1, 2, 3]),
2229 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2231 );
2232 let right = build_table_two_batches(
2233 ("a2", &vec![10, 20, 30]),
2234 ("b1", &vec![4, 5, 6]),
2235 ("c2", &vec![70, 80, 90]),
2236 );
2237 let on = vec![(
2238 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2239 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2240 )];
2241
2242 let join = join(
2243 left,
2244 right,
2245 on,
2246 &JoinType::Left,
2247 NullEquality::NullEqualsNothing,
2248 )
2249 .unwrap();
2250
2251 let columns = columns(&join.schema());
2252 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2253
2254 let stream = join.execute(0, task_ctx).unwrap();
2255 let batches = common::collect(stream).await.unwrap();
2256
2257 allow_duplicates! {
2258 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2259 +----+----+----+----+----+----+
2260 | a1 | b1 | c1 | a2 | b1 | c2 |
2261 +----+----+----+----+----+----+
2262 | 1 | 4 | 7 | 10 | 4 | 70 |
2263 | 1 | 4 | 7 | 10 | 4 | 70 |
2264 | 2 | 5 | 8 | 20 | 5 | 80 |
2265 | 2 | 5 | 8 | 20 | 5 | 80 |
2266 | 3 | 7 | 9 | | | |
2267 +----+----+----+----+----+----+
2268 "#);
2269 }
2270 }
2271
2272 #[apply(batch_sizes)]
2273 #[tokio::test]
2274 async fn join_full_multi_batch(batch_size: usize) {
2275 let task_ctx = prepare_task_ctx(batch_size);
2276 let left = build_table(
2277 ("a1", &vec![1, 2, 3]),
2278 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2280 );
2281 let right = build_table_two_batches(
2283 ("a2", &vec![10, 20, 30]),
2284 ("b2", &vec![4, 5, 6]),
2285 ("c2", &vec![70, 80, 90]),
2286 );
2287 let on = vec![(
2288 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2289 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2290 )];
2291
2292 let join = join(
2293 left,
2294 right,
2295 on,
2296 &JoinType::Full,
2297 NullEquality::NullEqualsNothing,
2298 )
2299 .unwrap();
2300
2301 let columns = columns(&join.schema());
2302 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2303
2304 let stream = join.execute(0, task_ctx).unwrap();
2305 let batches = common::collect(stream).await.unwrap();
2306
2307 allow_duplicates! {
2308 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2309 +----+----+----+----+----+----+
2310 | a1 | b1 | c1 | a2 | b2 | c2 |
2311 +----+----+----+----+----+----+
2312 | | | | 30 | 6 | 90 |
2313 | | | | 30 | 6 | 90 |
2314 | 1 | 4 | 7 | 10 | 4 | 70 |
2315 | 1 | 4 | 7 | 10 | 4 | 70 |
2316 | 2 | 5 | 8 | 20 | 5 | 80 |
2317 | 2 | 5 | 8 | 20 | 5 | 80 |
2318 | 3 | 7 | 9 | | | |
2319 +----+----+----+----+----+----+
2320 "#);
2321 }
2322 }
2323
2324 #[apply(batch_sizes)]
2325 #[tokio::test]
2326 async fn join_left_empty_right(batch_size: usize) {
2327 let task_ctx = prepare_task_ctx(batch_size);
2328 let left = build_table(
2329 ("a1", &vec![1, 2, 3]),
2330 ("b1", &vec![4, 5, 7]),
2331 ("c1", &vec![7, 8, 9]),
2332 );
2333 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
2334 let on = vec![(
2335 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2336 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
2337 )];
2338 let schema = right.schema();
2339 let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
2340 let join = join(
2341 left,
2342 right,
2343 on,
2344 &JoinType::Left,
2345 NullEquality::NullEqualsNothing,
2346 )
2347 .unwrap();
2348
2349 let columns = columns(&join.schema());
2350 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2351
2352 let stream = join.execute(0, task_ctx).unwrap();
2353 let batches = common::collect(stream).await.unwrap();
2354
2355 allow_duplicates! {
2356 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2357 +----+----+----+----+----+----+
2358 | a1 | b1 | c1 | a2 | b1 | c2 |
2359 +----+----+----+----+----+----+
2360 | 1 | 4 | 7 | | | |
2361 | 2 | 5 | 8 | | | |
2362 | 3 | 7 | 9 | | | |
2363 +----+----+----+----+----+----+
2364 "#);
2365 }
2366 }
2367
2368 #[apply(batch_sizes)]
2369 #[tokio::test]
2370 async fn join_full_empty_right(batch_size: usize) {
2371 let task_ctx = prepare_task_ctx(batch_size);
2372 let left = build_table(
2373 ("a1", &vec![1, 2, 3]),
2374 ("b1", &vec![4, 5, 7]),
2375 ("c1", &vec![7, 8, 9]),
2376 );
2377 let right = build_table_i32(("a2", &vec![]), ("b2", &vec![]), ("c2", &vec![]));
2378 let on = vec![(
2379 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
2380 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
2381 )];
2382 let schema = right.schema();
2383 let right = TestMemoryExec::try_new_exec(&[vec![right]], schema, None).unwrap();
2384 let join = join(
2385 left,
2386 right,
2387 on,
2388 &JoinType::Full,
2389 NullEquality::NullEqualsNothing,
2390 )
2391 .unwrap();
2392
2393 let columns = columns(&join.schema());
2394 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2395
2396 let stream = join.execute(0, task_ctx).unwrap();
2397 let batches = common::collect(stream).await.unwrap();
2398
2399 allow_duplicates! {
2400 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2401 +----+----+----+----+----+----+
2402 | a1 | b1 | c1 | a2 | b2 | c2 |
2403 +----+----+----+----+----+----+
2404 | 1 | 4 | 7 | | | |
2405 | 2 | 5 | 8 | | | |
2406 | 3 | 7 | 9 | | | |
2407 +----+----+----+----+----+----+
2408 "#);
2409 }
2410 }
2411
2412 #[apply(batch_sizes)]
2413 #[tokio::test]
2414 async fn join_left_one(batch_size: usize) -> Result<()> {
2415 let task_ctx = prepare_task_ctx(batch_size);
2416 let left = build_table(
2417 ("a1", &vec![1, 2, 3]),
2418 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2420 );
2421 let right = build_table(
2422 ("a2", &vec![10, 20, 30]),
2423 ("b1", &vec![4, 5, 6]),
2424 ("c2", &vec![70, 80, 90]),
2425 );
2426 let on = vec![(
2427 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2428 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2429 )];
2430
2431 let (columns, batches, metrics) = join_collect(
2432 Arc::clone(&left),
2433 Arc::clone(&right),
2434 on.clone(),
2435 &JoinType::Left,
2436 NullEquality::NullEqualsNothing,
2437 task_ctx,
2438 )
2439 .await?;
2440
2441 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2442
2443 allow_duplicates! {
2444 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2445 +----+----+----+----+----+----+
2446 | a1 | b1 | c1 | a2 | b1 | c2 |
2447 +----+----+----+----+----+----+
2448 | 1 | 4 | 7 | 10 | 4 | 70 |
2449 | 2 | 5 | 8 | 20 | 5 | 80 |
2450 | 3 | 7 | 9 | | | |
2451 +----+----+----+----+----+----+
2452 "#);
2453 }
2454
2455 assert_join_metrics!(metrics, 3);
2456
2457 Ok(())
2458 }
2459
2460 #[apply(batch_sizes)]
2461 #[tokio::test]
2462 async fn partitioned_join_left_one(batch_size: usize) -> Result<()> {
2463 let task_ctx = prepare_task_ctx(batch_size);
2464 let left = build_table(
2465 ("a1", &vec![1, 2, 3]),
2466 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
2468 );
2469 let right = build_table(
2470 ("a2", &vec![10, 20, 30]),
2471 ("b1", &vec![4, 5, 6]),
2472 ("c2", &vec![70, 80, 90]),
2473 );
2474 let on = vec![(
2475 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2476 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
2477 )];
2478
2479 let (columns, batches, metrics) = partitioned_join_collect(
2480 Arc::clone(&left),
2481 Arc::clone(&right),
2482 on.clone(),
2483 &JoinType::Left,
2484 NullEquality::NullEqualsNothing,
2485 task_ctx,
2486 )
2487 .await?;
2488
2489 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
2490
2491 allow_duplicates! {
2492 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2493 +----+----+----+----+----+----+
2494 | a1 | b1 | c1 | a2 | b1 | c2 |
2495 +----+----+----+----+----+----+
2496 | 1 | 4 | 7 | 10 | 4 | 70 |
2497 | 2 | 5 | 8 | 20 | 5 | 80 |
2498 | 3 | 7 | 9 | | | |
2499 +----+----+----+----+----+----+
2500 "#);
2501 }
2502
2503 assert_join_metrics!(metrics, 3);
2504
2505 Ok(())
2506 }
2507
2508 fn build_semi_anti_left_table() -> Arc<dyn ExecutionPlan> {
2509 build_table(
2512 ("a1", &vec![1, 3, 5, 7, 9, 11, 13]),
2513 ("b1", &vec![1, 3, 5, 7, 8, 8, 10]),
2514 ("c1", &vec![10, 30, 50, 70, 90, 110, 130]),
2515 )
2516 }
2517
2518 fn build_semi_anti_right_table() -> Arc<dyn ExecutionPlan> {
2519 build_table(
2522 ("a2", &vec![8, 12, 6, 2, 10, 4]),
2523 ("b2", &vec![8, 10, 6, 2, 10, 4]),
2524 ("c2", &vec![20, 40, 60, 80, 100, 120]),
2525 )
2526 }
2527
2528 #[apply(batch_sizes)]
2529 #[tokio::test]
2530 async fn join_left_semi(batch_size: usize) -> Result<()> {
2531 let task_ctx = prepare_task_ctx(batch_size);
2532 let left = build_semi_anti_left_table();
2533 let right = build_semi_anti_right_table();
2534 let on = vec![(
2536 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2537 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2538 )];
2539
2540 let join = join(
2541 left,
2542 right,
2543 on,
2544 &JoinType::LeftSemi,
2545 NullEquality::NullEqualsNothing,
2546 )?;
2547
2548 let columns = columns(&join.schema());
2549 assert_eq!(columns, vec!["a1", "b1", "c1"]);
2550
2551 let stream = join.execute(0, task_ctx)?;
2552 let batches = common::collect(stream).await?;
2553
2554 allow_duplicates! {
2556 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2557 +----+----+-----+
2558 | a1 | b1 | c1 |
2559 +----+----+-----+
2560 | 11 | 8 | 110 |
2561 | 13 | 10 | 130 |
2562 | 9 | 8 | 90 |
2563 +----+----+-----+
2564 "#);
2565 }
2566
2567 Ok(())
2568 }
2569
2570 #[apply(batch_sizes)]
2571 #[tokio::test]
2572 async fn join_left_semi_with_filter(batch_size: usize) -> Result<()> {
2573 let task_ctx = prepare_task_ctx(batch_size);
2574 let left = build_semi_anti_left_table();
2575 let right = build_semi_anti_right_table();
2576
2577 let on = vec![(
2579 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2580 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2581 )];
2582
2583 let column_indices = vec![ColumnIndex {
2584 index: 0,
2585 side: JoinSide::Right,
2586 }];
2587 let intermediate_schema =
2588 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2589
2590 let filter_expression = Arc::new(BinaryExpr::new(
2591 Arc::new(Column::new("x", 0)),
2592 Operator::NotEq,
2593 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2594 )) as Arc<dyn PhysicalExpr>;
2595
2596 let filter = JoinFilter::new(
2597 filter_expression,
2598 column_indices.clone(),
2599 Arc::new(intermediate_schema.clone()),
2600 );
2601
2602 let join = join_with_filter(
2603 Arc::clone(&left),
2604 Arc::clone(&right),
2605 on.clone(),
2606 filter,
2607 &JoinType::LeftSemi,
2608 NullEquality::NullEqualsNothing,
2609 )?;
2610
2611 let columns_header = columns(&join.schema());
2612 assert_eq!(columns_header.clone(), vec!["a1", "b1", "c1"]);
2613
2614 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2615 let batches = common::collect(stream).await?;
2616
2617 allow_duplicates! {
2618 assert_snapshot!(batches_to_sort_string(&batches), @r"
2619 +----+----+-----+
2620 | a1 | b1 | c1 |
2621 +----+----+-----+
2622 | 11 | 8 | 110 |
2623 | 13 | 10 | 130 |
2624 | 9 | 8 | 90 |
2625 +----+----+-----+
2626 ");
2627 }
2628
2629 let filter_expression = Arc::new(BinaryExpr::new(
2631 Arc::new(Column::new("x", 0)),
2632 Operator::Gt,
2633 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2634 )) as Arc<dyn PhysicalExpr>;
2635 let filter = JoinFilter::new(
2636 filter_expression,
2637 column_indices,
2638 Arc::new(intermediate_schema),
2639 );
2640
2641 let join = join_with_filter(
2642 left,
2643 right,
2644 on,
2645 filter,
2646 &JoinType::LeftSemi,
2647 NullEquality::NullEqualsNothing,
2648 )?;
2649
2650 let columns_header = columns(&join.schema());
2651 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2652
2653 let stream = join.execute(0, task_ctx)?;
2654 let batches = common::collect(stream).await?;
2655
2656 allow_duplicates! {
2657 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2658 +----+----+-----+
2659 | a1 | b1 | c1 |
2660 +----+----+-----+
2661 | 13 | 10 | 130 |
2662 +----+----+-----+
2663 "#);
2664 }
2665
2666 Ok(())
2667 }
2668
2669 #[apply(batch_sizes)]
2670 #[tokio::test]
2671 async fn join_right_semi(batch_size: usize) -> Result<()> {
2672 let task_ctx = prepare_task_ctx(batch_size);
2673 let left = build_semi_anti_left_table();
2674 let right = build_semi_anti_right_table();
2675
2676 let on = vec![(
2678 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2679 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2680 )];
2681
2682 let join = join(
2683 left,
2684 right,
2685 on,
2686 &JoinType::RightSemi,
2687 NullEquality::NullEqualsNothing,
2688 )?;
2689
2690 let columns = columns(&join.schema());
2691 assert_eq!(columns, vec!["a2", "b2", "c2"]);
2692
2693 let stream = join.execute(0, task_ctx)?;
2694 let batches = common::collect(stream).await?;
2695
2696 allow_duplicates! {
2698 assert_snapshot!(batches_to_string(&batches), @r#"
2699 +----+----+-----+
2700 | a2 | b2 | c2 |
2701 +----+----+-----+
2702 | 8 | 8 | 20 |
2703 | 12 | 10 | 40 |
2704 | 10 | 10 | 100 |
2705 +----+----+-----+
2706 "#);
2707 }
2708
2709 Ok(())
2710 }
2711
2712 #[apply(batch_sizes)]
2713 #[tokio::test]
2714 async fn join_right_semi_with_filter(batch_size: usize) -> Result<()> {
2715 let task_ctx = prepare_task_ctx(batch_size);
2716 let left = build_semi_anti_left_table();
2717 let right = build_semi_anti_right_table();
2718
2719 let on = vec![(
2721 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2722 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2723 )];
2724
2725 let column_indices = vec![ColumnIndex {
2726 index: 0,
2727 side: JoinSide::Left,
2728 }];
2729 let intermediate_schema =
2730 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2731
2732 let filter_expression = Arc::new(BinaryExpr::new(
2733 Arc::new(Column::new("x", 0)),
2734 Operator::NotEq,
2735 Arc::new(Literal::new(ScalarValue::Int32(Some(9)))),
2736 )) as Arc<dyn PhysicalExpr>;
2737
2738 let filter = JoinFilter::new(
2739 filter_expression,
2740 column_indices.clone(),
2741 Arc::new(intermediate_schema.clone()),
2742 );
2743
2744 let join = join_with_filter(
2745 Arc::clone(&left),
2746 Arc::clone(&right),
2747 on.clone(),
2748 filter,
2749 &JoinType::RightSemi,
2750 NullEquality::NullEqualsNothing,
2751 )?;
2752
2753 let columns = columns(&join.schema());
2754 assert_eq!(columns, vec!["a2", "b2", "c2"]);
2755
2756 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2757 let batches = common::collect(stream).await?;
2758
2759 allow_duplicates! {
2761 assert_snapshot!(batches_to_string(&batches), @r#"
2762 +----+----+-----+
2763 | a2 | b2 | c2 |
2764 +----+----+-----+
2765 | 8 | 8 | 20 |
2766 | 12 | 10 | 40 |
2767 | 10 | 10 | 100 |
2768 +----+----+-----+
2769 "#);
2770 }
2771
2772 let filter_expression = Arc::new(BinaryExpr::new(
2774 Arc::new(Column::new("x", 0)),
2775 Operator::Gt,
2776 Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
2777 )) as Arc<dyn PhysicalExpr>;
2778
2779 let filter = JoinFilter::new(
2780 filter_expression,
2781 column_indices,
2782 Arc::new(intermediate_schema.clone()),
2783 );
2784
2785 let join = join_with_filter(
2786 left,
2787 right,
2788 on,
2789 filter,
2790 &JoinType::RightSemi,
2791 NullEquality::NullEqualsNothing,
2792 )?;
2793 let stream = join.execute(0, task_ctx)?;
2794 let batches = common::collect(stream).await?;
2795
2796 allow_duplicates! {
2798 assert_snapshot!(batches_to_string(&batches), @r#"
2799 +----+----+-----+
2800 | a2 | b2 | c2 |
2801 +----+----+-----+
2802 | 12 | 10 | 40 |
2803 | 10 | 10 | 100 |
2804 +----+----+-----+
2805 "#);
2806 }
2807
2808 Ok(())
2809 }
2810
2811 #[apply(batch_sizes)]
2812 #[tokio::test]
2813 async fn join_left_anti(batch_size: usize) -> Result<()> {
2814 let task_ctx = prepare_task_ctx(batch_size);
2815 let left = build_semi_anti_left_table();
2816 let right = build_semi_anti_right_table();
2817 let on = vec![(
2819 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2820 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2821 )];
2822
2823 let join = join(
2824 left,
2825 right,
2826 on,
2827 &JoinType::LeftAnti,
2828 NullEquality::NullEqualsNothing,
2829 )?;
2830
2831 let columns = columns(&join.schema());
2832 assert_eq!(columns, vec!["a1", "b1", "c1"]);
2833
2834 let stream = join.execute(0, task_ctx)?;
2835 let batches = common::collect(stream).await?;
2836
2837 allow_duplicates! {
2838 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2839 +----+----+----+
2840 | a1 | b1 | c1 |
2841 +----+----+----+
2842 | 1 | 1 | 10 |
2843 | 3 | 3 | 30 |
2844 | 5 | 5 | 50 |
2845 | 7 | 7 | 70 |
2846 +----+----+----+
2847 "#);
2848 }
2849 Ok(())
2850 }
2851
2852 #[apply(batch_sizes)]
2853 #[tokio::test]
2854 async fn join_left_anti_with_filter(batch_size: usize) -> Result<()> {
2855 let task_ctx = prepare_task_ctx(batch_size);
2856 let left = build_semi_anti_left_table();
2857 let right = build_semi_anti_right_table();
2858 let on = vec![(
2860 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2861 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2862 )];
2863
2864 let column_indices = vec![ColumnIndex {
2865 index: 0,
2866 side: JoinSide::Right,
2867 }];
2868 let intermediate_schema =
2869 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
2870 let filter_expression = Arc::new(BinaryExpr::new(
2871 Arc::new(Column::new("x", 0)),
2872 Operator::NotEq,
2873 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2874 )) as Arc<dyn PhysicalExpr>;
2875
2876 let filter = JoinFilter::new(
2877 filter_expression,
2878 column_indices.clone(),
2879 Arc::new(intermediate_schema.clone()),
2880 );
2881
2882 let join = join_with_filter(
2883 Arc::clone(&left),
2884 Arc::clone(&right),
2885 on.clone(),
2886 filter,
2887 &JoinType::LeftAnti,
2888 NullEquality::NullEqualsNothing,
2889 )?;
2890
2891 let columns_header = columns(&join.schema());
2892 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2893
2894 let stream = join.execute(0, Arc::clone(&task_ctx))?;
2895 let batches = common::collect(stream).await?;
2896
2897 allow_duplicates! {
2898 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2899 +----+----+-----+
2900 | a1 | b1 | c1 |
2901 +----+----+-----+
2902 | 1 | 1 | 10 |
2903 | 11 | 8 | 110 |
2904 | 3 | 3 | 30 |
2905 | 5 | 5 | 50 |
2906 | 7 | 7 | 70 |
2907 | 9 | 8 | 90 |
2908 +----+----+-----+
2909 "#);
2910 }
2911
2912 let filter_expression = Arc::new(BinaryExpr::new(
2914 Arc::new(Column::new("x", 0)),
2915 Operator::NotEq,
2916 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2917 )) as Arc<dyn PhysicalExpr>;
2918
2919 let filter = JoinFilter::new(
2920 filter_expression,
2921 column_indices,
2922 Arc::new(intermediate_schema),
2923 );
2924
2925 let join = join_with_filter(
2926 left,
2927 right,
2928 on,
2929 filter,
2930 &JoinType::LeftAnti,
2931 NullEquality::NullEqualsNothing,
2932 )?;
2933
2934 let columns_header = columns(&join.schema());
2935 assert_eq!(columns_header, vec!["a1", "b1", "c1"]);
2936
2937 let stream = join.execute(0, task_ctx)?;
2938 let batches = common::collect(stream).await?;
2939
2940 allow_duplicates! {
2941 assert_snapshot!(batches_to_sort_string(&batches), @r#"
2942 +----+----+-----+
2943 | a1 | b1 | c1 |
2944 +----+----+-----+
2945 | 1 | 1 | 10 |
2946 | 11 | 8 | 110 |
2947 | 3 | 3 | 30 |
2948 | 5 | 5 | 50 |
2949 | 7 | 7 | 70 |
2950 | 9 | 8 | 90 |
2951 +----+----+-----+
2952 "#);
2953 }
2954
2955 Ok(())
2956 }
2957
2958 #[apply(batch_sizes)]
2959 #[tokio::test]
2960 async fn join_right_anti(batch_size: usize) -> Result<()> {
2961 let task_ctx = prepare_task_ctx(batch_size);
2962 let left = build_semi_anti_left_table();
2963 let right = build_semi_anti_right_table();
2964 let on = vec![(
2965 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
2966 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
2967 )];
2968
2969 let join = join(
2970 left,
2971 right,
2972 on,
2973 &JoinType::RightAnti,
2974 NullEquality::NullEqualsNothing,
2975 )?;
2976
2977 let columns = columns(&join.schema());
2978 assert_eq!(columns, vec!["a2", "b2", "c2"]);
2979
2980 let stream = join.execute(0, task_ctx)?;
2981 let batches = common::collect(stream).await?;
2982
2983 allow_duplicates! {
2985 assert_snapshot!(batches_to_string(&batches), @r#"
2986 +----+----+-----+
2987 | a2 | b2 | c2 |
2988 +----+----+-----+
2989 | 6 | 6 | 60 |
2990 | 2 | 2 | 80 |
2991 | 4 | 4 | 120 |
2992 +----+----+-----+
2993 "#);
2994 }
2995 Ok(())
2996 }
2997
2998 #[apply(batch_sizes)]
2999 #[tokio::test]
3000 async fn join_right_anti_with_filter(batch_size: usize) -> Result<()> {
3001 let task_ctx = prepare_task_ctx(batch_size);
3002 let left = build_semi_anti_left_table();
3003 let right = build_semi_anti_right_table();
3004 let on = vec![(
3006 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3007 Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
3008 )];
3009
3010 let column_indices = vec![ColumnIndex {
3011 index: 0,
3012 side: JoinSide::Left,
3013 }];
3014 let intermediate_schema =
3015 Schema::new(vec![Field::new("x", DataType::Int32, true)]);
3016
3017 let filter_expression = Arc::new(BinaryExpr::new(
3018 Arc::new(Column::new("x", 0)),
3019 Operator::NotEq,
3020 Arc::new(Literal::new(ScalarValue::Int32(Some(13)))),
3021 )) as Arc<dyn PhysicalExpr>;
3022
3023 let filter = JoinFilter::new(
3024 filter_expression,
3025 column_indices,
3026 Arc::new(intermediate_schema.clone()),
3027 );
3028
3029 let join = join_with_filter(
3030 Arc::clone(&left),
3031 Arc::clone(&right),
3032 on.clone(),
3033 filter,
3034 &JoinType::RightAnti,
3035 NullEquality::NullEqualsNothing,
3036 )?;
3037
3038 let columns_header = columns(&join.schema());
3039 assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3040
3041 let stream = join.execute(0, Arc::clone(&task_ctx))?;
3042 let batches = common::collect(stream).await?;
3043
3044 allow_duplicates! {
3046 assert_snapshot!(batches_to_string(&batches), @r#"
3047 +----+----+-----+
3048 | a2 | b2 | c2 |
3049 +----+----+-----+
3050 | 12 | 10 | 40 |
3051 | 6 | 6 | 60 |
3052 | 2 | 2 | 80 |
3053 | 10 | 10 | 100 |
3054 | 4 | 4 | 120 |
3055 +----+----+-----+
3056 "#);
3057 }
3058
3059 let column_indices = vec![ColumnIndex {
3061 index: 1,
3062 side: JoinSide::Right,
3063 }];
3064 let filter_expression = Arc::new(BinaryExpr::new(
3065 Arc::new(Column::new("x", 0)),
3066 Operator::NotEq,
3067 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
3068 )) as Arc<dyn PhysicalExpr>;
3069
3070 let filter = JoinFilter::new(
3071 filter_expression,
3072 column_indices,
3073 Arc::new(intermediate_schema),
3074 );
3075
3076 let join = join_with_filter(
3077 left,
3078 right,
3079 on,
3080 filter,
3081 &JoinType::RightAnti,
3082 NullEquality::NullEqualsNothing,
3083 )?;
3084
3085 let columns_header = columns(&join.schema());
3086 assert_eq!(columns_header, vec!["a2", "b2", "c2"]);
3087
3088 let stream = join.execute(0, task_ctx)?;
3089 let batches = common::collect(stream).await?;
3090
3091 allow_duplicates! {
3093 assert_snapshot!(batches_to_string(&batches), @r#"
3094 +----+----+-----+
3095 | a2 | b2 | c2 |
3096 +----+----+-----+
3097 | 8 | 8 | 20 |
3098 | 6 | 6 | 60 |
3099 | 2 | 2 | 80 |
3100 | 4 | 4 | 120 |
3101 +----+----+-----+
3102 "#);
3103 }
3104
3105 Ok(())
3106 }
3107
3108 #[apply(batch_sizes)]
3109 #[tokio::test]
3110 async fn join_right_one(batch_size: usize) -> Result<()> {
3111 let task_ctx = prepare_task_ctx(batch_size);
3112 let left = build_table(
3113 ("a1", &vec![1, 2, 3]),
3114 ("b1", &vec![4, 5, 7]),
3115 ("c1", &vec![7, 8, 9]),
3116 );
3117 let right = build_table(
3118 ("a2", &vec![10, 20, 30]),
3119 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
3121 );
3122 let on = vec![(
3123 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3124 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3125 )];
3126
3127 let (columns, batches, metrics) = join_collect(
3128 left,
3129 right,
3130 on,
3131 &JoinType::Right,
3132 NullEquality::NullEqualsNothing,
3133 task_ctx,
3134 )
3135 .await?;
3136
3137 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3138
3139 allow_duplicates! {
3140 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3141 +----+----+----+----+----+----+
3142 | a1 | b1 | c1 | a2 | b1 | c2 |
3143 +----+----+----+----+----+----+
3144 | | | | 30 | 6 | 90 |
3145 | 1 | 4 | 7 | 10 | 4 | 70 |
3146 | 2 | 5 | 8 | 20 | 5 | 80 |
3147 +----+----+----+----+----+----+
3148 "#);
3149 }
3150
3151 assert_join_metrics!(metrics, 3);
3152
3153 Ok(())
3154 }
3155
3156 #[apply(batch_sizes)]
3157 #[tokio::test]
3158 async fn partitioned_join_right_one(batch_size: usize) -> Result<()> {
3159 let task_ctx = prepare_task_ctx(batch_size);
3160 let left = build_table(
3161 ("a1", &vec![1, 2, 3]),
3162 ("b1", &vec![4, 5, 7]),
3163 ("c1", &vec![7, 8, 9]),
3164 );
3165 let right = build_table(
3166 ("a2", &vec![10, 20, 30]),
3167 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
3169 );
3170 let on = vec![(
3171 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3172 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3173 )];
3174
3175 let (columns, batches, metrics) = partitioned_join_collect(
3176 left,
3177 right,
3178 on,
3179 &JoinType::Right,
3180 NullEquality::NullEqualsNothing,
3181 task_ctx,
3182 )
3183 .await?;
3184
3185 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
3186
3187 allow_duplicates! {
3188 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3189 +----+----+----+----+----+----+
3190 | a1 | b1 | c1 | a2 | b1 | c2 |
3191 +----+----+----+----+----+----+
3192 | | | | 30 | 6 | 90 |
3193 | 1 | 4 | 7 | 10 | 4 | 70 |
3194 | 2 | 5 | 8 | 20 | 5 | 80 |
3195 +----+----+----+----+----+----+
3196 "#);
3197 }
3198
3199 assert_join_metrics!(metrics, 3);
3200
3201 Ok(())
3202 }
3203
3204 #[apply(batch_sizes)]
3205 #[tokio::test]
3206 async fn join_full_one(batch_size: usize) -> Result<()> {
3207 let task_ctx = prepare_task_ctx(batch_size);
3208 let left = build_table(
3209 ("a1", &vec![1, 2, 3]),
3210 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3212 );
3213 let right = build_table(
3214 ("a2", &vec![10, 20, 30]),
3215 ("b2", &vec![4, 5, 6]),
3216 ("c2", &vec![70, 80, 90]),
3217 );
3218 let on = vec![(
3219 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3220 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3221 )];
3222
3223 let join = join(
3224 left,
3225 right,
3226 on,
3227 &JoinType::Full,
3228 NullEquality::NullEqualsNothing,
3229 )?;
3230
3231 let columns = columns(&join.schema());
3232 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
3233
3234 let stream = join.execute(0, task_ctx)?;
3235 let batches = common::collect(stream).await?;
3236
3237 allow_duplicates! {
3238 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3239 +----+----+----+----+----+----+
3240 | a1 | b1 | c1 | a2 | b2 | c2 |
3241 +----+----+----+----+----+----+
3242 | | | | 30 | 6 | 90 |
3243 | 1 | 4 | 7 | 10 | 4 | 70 |
3244 | 2 | 5 | 8 | 20 | 5 | 80 |
3245 | 3 | 7 | 9 | | | |
3246 +----+----+----+----+----+----+
3247 "#);
3248 }
3249
3250 Ok(())
3251 }
3252
3253 #[apply(batch_sizes)]
3254 #[tokio::test]
3255 async fn join_left_mark(batch_size: usize) -> Result<()> {
3256 let task_ctx = prepare_task_ctx(batch_size);
3257 let left = build_table(
3258 ("a1", &vec![1, 2, 3]),
3259 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3261 );
3262 let right = build_table(
3263 ("a2", &vec![10, 20, 30]),
3264 ("b1", &vec![4, 5, 6]),
3265 ("c2", &vec![70, 80, 90]),
3266 );
3267 let on = vec![(
3268 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3269 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3270 )];
3271
3272 let (columns, batches, metrics) = join_collect(
3273 Arc::clone(&left),
3274 Arc::clone(&right),
3275 on.clone(),
3276 &JoinType::LeftMark,
3277 NullEquality::NullEqualsNothing,
3278 task_ctx,
3279 )
3280 .await?;
3281
3282 assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
3283
3284 allow_duplicates! {
3285 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3286 +----+----+----+-------+
3287 | a1 | b1 | c1 | mark |
3288 +----+----+----+-------+
3289 | 1 | 4 | 7 | true |
3290 | 2 | 5 | 8 | true |
3291 | 3 | 7 | 9 | false |
3292 +----+----+----+-------+
3293 "#);
3294 }
3295
3296 assert_join_metrics!(metrics, 3);
3297
3298 Ok(())
3299 }
3300
3301 #[apply(batch_sizes)]
3302 #[tokio::test]
3303 async fn partitioned_join_left_mark(batch_size: usize) -> Result<()> {
3304 let task_ctx = prepare_task_ctx(batch_size);
3305 let left = build_table(
3306 ("a1", &vec![1, 2, 3]),
3307 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3309 );
3310 let right = build_table(
3311 ("a2", &vec![10, 20, 30, 40]),
3312 ("b1", &vec![4, 4, 5, 6]),
3313 ("c2", &vec![60, 70, 80, 90]),
3314 );
3315 let on = vec![(
3316 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3317 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3318 )];
3319
3320 let (columns, batches, metrics) = partitioned_join_collect(
3321 Arc::clone(&left),
3322 Arc::clone(&right),
3323 on.clone(),
3324 &JoinType::LeftMark,
3325 NullEquality::NullEqualsNothing,
3326 task_ctx,
3327 )
3328 .await?;
3329
3330 assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
3331
3332 allow_duplicates! {
3333 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3334 +----+----+----+-------+
3335 | a1 | b1 | c1 | mark |
3336 +----+----+----+-------+
3337 | 1 | 4 | 7 | true |
3338 | 2 | 5 | 8 | true |
3339 | 3 | 7 | 9 | false |
3340 +----+----+----+-------+
3341 "#);
3342 }
3343
3344 assert_join_metrics!(metrics, 3);
3345
3346 Ok(())
3347 }
3348
3349 #[apply(batch_sizes)]
3350 #[tokio::test]
3351 async fn join_right_mark(batch_size: usize) -> Result<()> {
3352 let task_ctx = prepare_task_ctx(batch_size);
3353 let left = build_table(
3354 ("a1", &vec![1, 2, 3]),
3355 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3357 );
3358 let right = build_table(
3359 ("a2", &vec![10, 20, 30]),
3360 ("b1", &vec![4, 5, 6]), ("c2", &vec![70, 80, 90]),
3362 );
3363 let on = vec![(
3364 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3365 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3366 )];
3367
3368 let (columns, batches, metrics) = join_collect(
3369 Arc::clone(&left),
3370 Arc::clone(&right),
3371 on.clone(),
3372 &JoinType::RightMark,
3373 NullEquality::NullEqualsNothing,
3374 task_ctx,
3375 )
3376 .await?;
3377
3378 assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
3379
3380 let expected = [
3381 "+----+----+----+-------+",
3382 "| a2 | b1 | c2 | mark |",
3383 "+----+----+----+-------+",
3384 "| 10 | 4 | 70 | true |",
3385 "| 20 | 5 | 80 | true |",
3386 "| 30 | 6 | 90 | false |",
3387 "+----+----+----+-------+",
3388 ];
3389 assert_batches_sorted_eq!(expected, &batches);
3390
3391 assert_join_metrics!(metrics, 3);
3392
3393 Ok(())
3394 }
3395
3396 #[apply(batch_sizes)]
3397 #[tokio::test]
3398 async fn partitioned_join_right_mark(batch_size: usize) -> Result<()> {
3399 let task_ctx = prepare_task_ctx(batch_size);
3400 let left = build_table(
3401 ("a1", &vec![1, 2, 3]),
3402 ("b1", &vec![4, 5, 7]), ("c1", &vec![7, 8, 9]),
3404 );
3405 let right = build_table(
3406 ("a2", &vec![10, 20, 30, 40]),
3407 ("b1", &vec![4, 4, 5, 6]), ("c2", &vec![60, 70, 80, 90]),
3409 );
3410 let on = vec![(
3411 Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3412 Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3413 )];
3414
3415 let (columns, batches, metrics) = partitioned_join_collect(
3416 Arc::clone(&left),
3417 Arc::clone(&right),
3418 on.clone(),
3419 &JoinType::RightMark,
3420 NullEquality::NullEqualsNothing,
3421 task_ctx,
3422 )
3423 .await?;
3424
3425 assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]);
3426
3427 let expected = [
3428 "+----+----+----+-------+",
3429 "| a2 | b1 | c2 | mark |",
3430 "+----+----+----+-------+",
3431 "| 10 | 4 | 60 | true |",
3432 "| 20 | 4 | 70 | true |",
3433 "| 30 | 5 | 80 | true |",
3434 "| 40 | 6 | 90 | false |",
3435 "+----+----+----+-------+",
3436 ];
3437 assert_batches_sorted_eq!(expected, &batches);
3438
3439 assert_join_metrics!(metrics, 4);
3440
3441 Ok(())
3442 }
3443
3444 #[test]
3445 fn join_with_hash_collisions_64() -> Result<()> {
3446 let mut hashmap_left = HashTable::with_capacity(4);
3447 let left = build_table_i32(
3448 ("a", &vec![10, 20]),
3449 ("x", &vec![100, 200]),
3450 ("y", &vec![200, 300]),
3451 );
3452
3453 let random_state = RandomState::with_seeds(0, 0, 0, 0);
3454 let hashes_buff = &mut vec![0; left.num_rows()];
3455 let hashes = create_hashes(
3456 &[Arc::clone(&left.columns()[0])],
3457 &random_state,
3458 hashes_buff,
3459 )?;
3460
3461 hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
3466 hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
3467
3468 hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
3469 hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
3470
3471 let next = vec![2, 0];
3472
3473 let right = build_table_i32(
3474 ("a", &vec![10, 20]),
3475 ("b", &vec![0, 0]),
3476 ("c", &vec![30, 40]),
3477 );
3478
3479 let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
3481
3482 let join_hash_map = JoinHashMapU64::new(hashmap_left, next);
3483
3484 let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
3485 let right_keys_values =
3486 key_column.evaluate(&right)?.into_array(right.num_rows())?;
3487 let mut hashes_buffer = vec![0; right.num_rows()];
3488 create_hashes(
3489 &[Arc::clone(&right_keys_values)],
3490 &random_state,
3491 &mut hashes_buffer,
3492 )?;
3493
3494 let (l, r, _) = lookup_join_hashmap(
3495 &join_hash_map,
3496 &[left_keys_values],
3497 &[right_keys_values],
3498 NullEquality::NullEqualsNothing,
3499 &hashes_buffer,
3500 8192,
3501 (0, None),
3502 )?;
3503
3504 let left_ids: UInt64Array = vec![0, 1].into();
3505
3506 let right_ids: UInt32Array = vec![0, 1].into();
3507
3508 assert_eq!(left_ids, l);
3509
3510 assert_eq!(right_ids, r);
3511
3512 Ok(())
3513 }
3514
3515 #[test]
3516 fn join_with_hash_collisions_u32() -> Result<()> {
3517 let mut hashmap_left = HashTable::with_capacity(4);
3518 let left = build_table_i32(
3519 ("a", &vec![10, 20]),
3520 ("x", &vec![100, 200]),
3521 ("y", &vec![200, 300]),
3522 );
3523
3524 let random_state = RandomState::with_seeds(0, 0, 0, 0);
3525 let hashes_buff = &mut vec![0; left.num_rows()];
3526 let hashes = create_hashes(
3527 &[Arc::clone(&left.columns()[0])],
3528 &random_state,
3529 hashes_buff,
3530 )?;
3531
3532 hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h);
3533 hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h);
3534 hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h);
3535 hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h);
3536
3537 let next: Vec<u32> = vec![2, 0];
3538
3539 let right = build_table_i32(
3540 ("a", &vec![10, 20]),
3541 ("b", &vec![0, 0]),
3542 ("c", &vec![30, 40]),
3543 );
3544
3545 let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _;
3546
3547 let join_hash_map = JoinHashMapU32::new(hashmap_left, next);
3548
3549 let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?;
3550 let right_keys_values =
3551 key_column.evaluate(&right)?.into_array(right.num_rows())?;
3552 let mut hashes_buffer = vec![0; right.num_rows()];
3553 create_hashes(
3554 &[Arc::clone(&right_keys_values)],
3555 &random_state,
3556 &mut hashes_buffer,
3557 )?;
3558
3559 let (l, r, _) = lookup_join_hashmap(
3560 &join_hash_map,
3561 &[left_keys_values],
3562 &[right_keys_values],
3563 NullEquality::NullEqualsNothing,
3564 &hashes_buffer,
3565 8192,
3566 (0, None),
3567 )?;
3568
3569 let left_ids: UInt64Array = vec![0, 1].into();
3571 let right_ids: UInt32Array = vec![0, 1].into();
3572
3573 assert_eq!(left_ids, l);
3574 assert_eq!(right_ids, r);
3575
3576 Ok(())
3577 }
3578
3579 #[tokio::test]
3580 async fn join_with_duplicated_column_names() -> Result<()> {
3581 let task_ctx = Arc::new(TaskContext::default());
3582 let left = build_table(
3583 ("a", &vec![1, 2, 3]),
3584 ("b", &vec![4, 5, 7]),
3585 ("c", &vec![7, 8, 9]),
3586 );
3587 let right = build_table(
3588 ("a", &vec![10, 20, 30]),
3589 ("b", &vec![1, 2, 7]),
3590 ("c", &vec![70, 80, 90]),
3591 );
3592 let on = vec![(
3593 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3595 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3596 )];
3597
3598 let join = join(
3599 left,
3600 right,
3601 on,
3602 &JoinType::Inner,
3603 NullEquality::NullEqualsNothing,
3604 )?;
3605
3606 let columns = columns(&join.schema());
3607 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3608
3609 let stream = join.execute(0, task_ctx)?;
3610 let batches = common::collect(stream).await?;
3611
3612 allow_duplicates! {
3613 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3614 +---+---+---+----+---+----+
3615 | a | b | c | a | b | c |
3616 +---+---+---+----+---+----+
3617 | 1 | 4 | 7 | 10 | 1 | 70 |
3618 | 2 | 5 | 8 | 20 | 2 | 80 |
3619 +---+---+---+----+---+----+
3620 "#);
3621 }
3622
3623 Ok(())
3624 }
3625
3626 fn prepare_join_filter() -> JoinFilter {
3627 let column_indices = vec![
3628 ColumnIndex {
3629 index: 2,
3630 side: JoinSide::Left,
3631 },
3632 ColumnIndex {
3633 index: 2,
3634 side: JoinSide::Right,
3635 },
3636 ];
3637 let intermediate_schema = Schema::new(vec![
3638 Field::new("c", DataType::Int32, true),
3639 Field::new("c", DataType::Int32, true),
3640 ]);
3641 let filter_expression = Arc::new(BinaryExpr::new(
3642 Arc::new(Column::new("c", 0)),
3643 Operator::Gt,
3644 Arc::new(Column::new("c", 1)),
3645 )) as Arc<dyn PhysicalExpr>;
3646
3647 JoinFilter::new(
3648 filter_expression,
3649 column_indices,
3650 Arc::new(intermediate_schema),
3651 )
3652 }
3653
3654 #[apply(batch_sizes)]
3655 #[tokio::test]
3656 async fn join_inner_with_filter(batch_size: usize) -> Result<()> {
3657 let task_ctx = prepare_task_ctx(batch_size);
3658 let left = build_table(
3659 ("a", &vec![0, 1, 2, 2]),
3660 ("b", &vec![4, 5, 7, 8]),
3661 ("c", &vec![7, 8, 9, 1]),
3662 );
3663 let right = build_table(
3664 ("a", &vec![10, 20, 30, 40]),
3665 ("b", &vec![2, 2, 3, 4]),
3666 ("c", &vec![7, 5, 6, 4]),
3667 );
3668 let on = vec![(
3669 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3670 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3671 )];
3672 let filter = prepare_join_filter();
3673
3674 let join = join_with_filter(
3675 left,
3676 right,
3677 on,
3678 filter,
3679 &JoinType::Inner,
3680 NullEquality::NullEqualsNothing,
3681 )?;
3682
3683 let columns = columns(&join.schema());
3684 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3685
3686 let stream = join.execute(0, task_ctx)?;
3687 let batches = common::collect(stream).await?;
3688
3689 allow_duplicates! {
3690 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3691 +---+---+---+----+---+---+
3692 | a | b | c | a | b | c |
3693 +---+---+---+----+---+---+
3694 | 2 | 7 | 9 | 10 | 2 | 7 |
3695 | 2 | 7 | 9 | 20 | 2 | 5 |
3696 +---+---+---+----+---+---+
3697 "#);
3698 }
3699
3700 Ok(())
3701 }
3702
3703 #[apply(batch_sizes)]
3704 #[tokio::test]
3705 async fn join_left_with_filter(batch_size: usize) -> Result<()> {
3706 let task_ctx = prepare_task_ctx(batch_size);
3707 let left = build_table(
3708 ("a", &vec![0, 1, 2, 2]),
3709 ("b", &vec![4, 5, 7, 8]),
3710 ("c", &vec![7, 8, 9, 1]),
3711 );
3712 let right = build_table(
3713 ("a", &vec![10, 20, 30, 40]),
3714 ("b", &vec![2, 2, 3, 4]),
3715 ("c", &vec![7, 5, 6, 4]),
3716 );
3717 let on = vec![(
3718 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3719 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3720 )];
3721 let filter = prepare_join_filter();
3722
3723 let join = join_with_filter(
3724 left,
3725 right,
3726 on,
3727 filter,
3728 &JoinType::Left,
3729 NullEquality::NullEqualsNothing,
3730 )?;
3731
3732 let columns = columns(&join.schema());
3733 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3734
3735 let stream = join.execute(0, task_ctx)?;
3736 let batches = common::collect(stream).await?;
3737
3738 allow_duplicates! {
3739 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3740 +---+---+---+----+---+---+
3741 | a | b | c | a | b | c |
3742 +---+---+---+----+---+---+
3743 | 0 | 4 | 7 | | | |
3744 | 1 | 5 | 8 | | | |
3745 | 2 | 7 | 9 | 10 | 2 | 7 |
3746 | 2 | 7 | 9 | 20 | 2 | 5 |
3747 | 2 | 8 | 1 | | | |
3748 +---+---+---+----+---+---+
3749 "#);
3750 }
3751
3752 Ok(())
3753 }
3754
3755 #[apply(batch_sizes)]
3756 #[tokio::test]
3757 async fn join_right_with_filter(batch_size: usize) -> Result<()> {
3758 let task_ctx = prepare_task_ctx(batch_size);
3759 let left = build_table(
3760 ("a", &vec![0, 1, 2, 2]),
3761 ("b", &vec![4, 5, 7, 8]),
3762 ("c", &vec![7, 8, 9, 1]),
3763 );
3764 let right = build_table(
3765 ("a", &vec![10, 20, 30, 40]),
3766 ("b", &vec![2, 2, 3, 4]),
3767 ("c", &vec![7, 5, 6, 4]),
3768 );
3769 let on = vec![(
3770 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3771 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3772 )];
3773 let filter = prepare_join_filter();
3774
3775 let join = join_with_filter(
3776 left,
3777 right,
3778 on,
3779 filter,
3780 &JoinType::Right,
3781 NullEquality::NullEqualsNothing,
3782 )?;
3783
3784 let columns = columns(&join.schema());
3785 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3786
3787 let stream = join.execute(0, task_ctx)?;
3788 let batches = common::collect(stream).await?;
3789
3790 allow_duplicates! {
3791 assert_snapshot!(batches_to_sort_string(&batches), @r#"
3792 +---+---+---+----+---+---+
3793 | a | b | c | a | b | c |
3794 +---+---+---+----+---+---+
3795 | | | | 30 | 3 | 6 |
3796 | | | | 40 | 4 | 4 |
3797 | 2 | 7 | 9 | 10 | 2 | 7 |
3798 | 2 | 7 | 9 | 20 | 2 | 5 |
3799 +---+---+---+----+---+---+
3800 "#);
3801 }
3802
3803 Ok(())
3804 }
3805
3806 #[apply(batch_sizes)]
3807 #[tokio::test]
3808 async fn join_full_with_filter(batch_size: usize) -> Result<()> {
3809 let task_ctx = prepare_task_ctx(batch_size);
3810 let left = build_table(
3811 ("a", &vec![0, 1, 2, 2]),
3812 ("b", &vec![4, 5, 7, 8]),
3813 ("c", &vec![7, 8, 9, 1]),
3814 );
3815 let right = build_table(
3816 ("a", &vec![10, 20, 30, 40]),
3817 ("b", &vec![2, 2, 3, 4]),
3818 ("c", &vec![7, 5, 6, 4]),
3819 );
3820 let on = vec![(
3821 Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _,
3822 Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
3823 )];
3824 let filter = prepare_join_filter();
3825
3826 let join = join_with_filter(
3827 left,
3828 right,
3829 on,
3830 filter,
3831 &JoinType::Full,
3832 NullEquality::NullEqualsNothing,
3833 )?;
3834
3835 let columns = columns(&join.schema());
3836 assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
3837
3838 let stream = join.execute(0, task_ctx)?;
3839 let batches = common::collect(stream).await?;
3840
3841 let expected = [
3842 "+---+---+---+----+---+---+",
3843 "| a | b | c | a | b | c |",
3844 "+---+---+---+----+---+---+",
3845 "| | | | 30 | 3 | 6 |",
3846 "| | | | 40 | 4 | 4 |",
3847 "| 2 | 7 | 9 | 10 | 2 | 7 |",
3848 "| 2 | 7 | 9 | 20 | 2 | 5 |",
3849 "| 0 | 4 | 7 | | | |",
3850 "| 1 | 5 | 8 | | | |",
3851 "| 2 | 8 | 1 | | | |",
3852 "+---+---+---+----+---+---+",
3853 ];
3854 assert_batches_sorted_eq!(expected, &batches);
3855
3856 Ok(())
3874 }
3875
3876 #[tokio::test]
3878 async fn test_collect_left_multiple_partitions_join() -> Result<()> {
3879 let task_ctx = Arc::new(TaskContext::default());
3880 let left = build_table(
3881 ("a1", &vec![1, 2, 3]),
3882 ("b1", &vec![4, 5, 7]),
3883 ("c1", &vec![7, 8, 9]),
3884 );
3885 let right = build_table(
3886 ("a2", &vec![10, 20, 30]),
3887 ("b2", &vec![4, 5, 6]),
3888 ("c2", &vec![70, 80, 90]),
3889 );
3890 let on = vec![(
3891 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
3892 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
3893 )];
3894
3895 let expected_inner = vec![
3896 "+----+----+----+----+----+----+",
3897 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3898 "+----+----+----+----+----+----+",
3899 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3900 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3901 "+----+----+----+----+----+----+",
3902 ];
3903 let expected_left = vec![
3904 "+----+----+----+----+----+----+",
3905 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3906 "+----+----+----+----+----+----+",
3907 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3908 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3909 "| 3 | 7 | 9 | | | |",
3910 "+----+----+----+----+----+----+",
3911 ];
3912 let expected_right = vec![
3913 "+----+----+----+----+----+----+",
3914 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3915 "+----+----+----+----+----+----+",
3916 "| | | | 30 | 6 | 90 |",
3917 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3918 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3919 "+----+----+----+----+----+----+",
3920 ];
3921 let expected_full = vec![
3922 "+----+----+----+----+----+----+",
3923 "| a1 | b1 | c1 | a2 | b2 | c2 |",
3924 "+----+----+----+----+----+----+",
3925 "| | | | 30 | 6 | 90 |",
3926 "| 1 | 4 | 7 | 10 | 4 | 70 |",
3927 "| 2 | 5 | 8 | 20 | 5 | 80 |",
3928 "| 3 | 7 | 9 | | | |",
3929 "+----+----+----+----+----+----+",
3930 ];
3931 let expected_left_semi = vec![
3932 "+----+----+----+",
3933 "| a1 | b1 | c1 |",
3934 "+----+----+----+",
3935 "| 1 | 4 | 7 |",
3936 "| 2 | 5 | 8 |",
3937 "+----+----+----+",
3938 ];
3939 let expected_left_anti = vec![
3940 "+----+----+----+",
3941 "| a1 | b1 | c1 |",
3942 "+----+----+----+",
3943 "| 3 | 7 | 9 |",
3944 "+----+----+----+",
3945 ];
3946 let expected_right_semi = vec![
3947 "+----+----+----+",
3948 "| a2 | b2 | c2 |",
3949 "+----+----+----+",
3950 "| 10 | 4 | 70 |",
3951 "| 20 | 5 | 80 |",
3952 "+----+----+----+",
3953 ];
3954 let expected_right_anti = vec![
3955 "+----+----+----+",
3956 "| a2 | b2 | c2 |",
3957 "+----+----+----+",
3958 "| 30 | 6 | 90 |",
3959 "+----+----+----+",
3960 ];
3961 let expected_left_mark = vec![
3962 "+----+----+----+-------+",
3963 "| a1 | b1 | c1 | mark |",
3964 "+----+----+----+-------+",
3965 "| 1 | 4 | 7 | true |",
3966 "| 2 | 5 | 8 | true |",
3967 "| 3 | 7 | 9 | false |",
3968 "+----+----+----+-------+",
3969 ];
3970 let expected_right_mark = vec![
3971 "+----+----+----+-------+",
3972 "| a2 | b2 | c2 | mark |",
3973 "+----+----+----+-------+",
3974 "| 10 | 4 | 70 | true |",
3975 "| 20 | 5 | 80 | true |",
3976 "| 30 | 6 | 90 | false |",
3977 "+----+----+----+-------+",
3978 ];
3979
3980 let test_cases = vec![
3981 (JoinType::Inner, expected_inner),
3982 (JoinType::Left, expected_left),
3983 (JoinType::Right, expected_right),
3984 (JoinType::Full, expected_full),
3985 (JoinType::LeftSemi, expected_left_semi),
3986 (JoinType::LeftAnti, expected_left_anti),
3987 (JoinType::RightSemi, expected_right_semi),
3988 (JoinType::RightAnti, expected_right_anti),
3989 (JoinType::LeftMark, expected_left_mark),
3990 (JoinType::RightMark, expected_right_mark),
3991 ];
3992
3993 for (join_type, expected) in test_cases {
3994 let (_, batches, metrics) = join_collect_with_partition_mode(
3995 Arc::clone(&left),
3996 Arc::clone(&right),
3997 on.clone(),
3998 &join_type,
3999 PartitionMode::CollectLeft,
4000 NullEquality::NullEqualsNothing,
4001 Arc::clone(&task_ctx),
4002 )
4003 .await?;
4004 assert_batches_sorted_eq!(expected, &batches);
4005 assert_join_metrics!(metrics, expected.len() - 4);
4006 }
4007
4008 Ok(())
4009 }
4010
4011 #[tokio::test]
4012 async fn join_date32() -> Result<()> {
4013 let schema = Arc::new(Schema::new(vec![
4014 Field::new("date", DataType::Date32, false),
4015 Field::new("n", DataType::Int32, false),
4016 ]));
4017
4018 let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109]));
4019 let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
4020 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4021 let left =
4022 TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
4023 .unwrap();
4024 let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109]));
4025 let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
4026 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![dates, n])?;
4027 let right = TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap();
4028 let on = vec![(
4029 Arc::new(Column::new_with_schema("date", &left.schema()).unwrap()) as _,
4030 Arc::new(Column::new_with_schema("date", &right.schema()).unwrap()) as _,
4031 )];
4032
4033 let join = join(
4034 left,
4035 right,
4036 on,
4037 &JoinType::Inner,
4038 NullEquality::NullEqualsNothing,
4039 )?;
4040
4041 let task_ctx = Arc::new(TaskContext::default());
4042 let stream = join.execute(0, task_ctx)?;
4043 let batches = common::collect(stream).await?;
4044
4045 allow_duplicates! {
4046 assert_snapshot!(batches_to_sort_string(&batches), @r#"
4047 +------------+---+------------+---+
4048 | date | n | date | n |
4049 +------------+---+------------+---+
4050 | 2022-04-26 | 2 | 2022-04-26 | 4 |
4051 | 2022-04-26 | 2 | 2022-04-26 | 5 |
4052 | 2022-04-27 | 3 | 2022-04-27 | 6 |
4053 +------------+---+------------+---+
4054 "#);
4055 }
4056
4057 Ok(())
4058 }
4059
4060 #[tokio::test]
4061 async fn join_with_error_right() {
4062 let left = build_table(
4063 ("a1", &vec![1, 2, 3]),
4064 ("b1", &vec![4, 5, 7]),
4065 ("c1", &vec![7, 8, 9]),
4066 );
4067
4068 let err = exec_err!("bad data error");
4071 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4072
4073 let on = vec![(
4074 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4075 Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
4076 )];
4077 let schema = right.schema();
4078 let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4079 let right_input = Arc::new(MockExec::new(vec![Ok(right), err], schema));
4080
4081 let join_types = vec![
4082 JoinType::Inner,
4083 JoinType::Left,
4084 JoinType::Right,
4085 JoinType::Full,
4086 JoinType::LeftSemi,
4087 JoinType::LeftAnti,
4088 JoinType::RightSemi,
4089 JoinType::RightAnti,
4090 ];
4091
4092 for join_type in join_types {
4093 let join = join(
4094 Arc::clone(&left),
4095 Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
4096 on.clone(),
4097 &join_type,
4098 NullEquality::NullEqualsNothing,
4099 )
4100 .unwrap();
4101 let task_ctx = Arc::new(TaskContext::default());
4102
4103 let stream = join.execute(0, task_ctx).unwrap();
4104
4105 let result_string = common::collect(stream).await.unwrap_err().to_string();
4107 assert!(
4108 result_string.contains("bad data error"),
4109 "actual: {result_string}"
4110 );
4111 }
4112 }
4113
4114 #[tokio::test]
4115 async fn join_split_batch() {
4116 let left = build_table(
4117 ("a1", &vec![1, 2, 3, 4]),
4118 ("b1", &vec![1, 1, 1, 1]),
4119 ("c1", &vec![0, 0, 0, 0]),
4120 );
4121 let right = build_table(
4122 ("a2", &vec![10, 20, 30, 40, 50]),
4123 ("b2", &vec![1, 1, 1, 1, 1]),
4124 ("c2", &vec![0, 0, 0, 0, 0]),
4125 );
4126 let on = vec![(
4127 Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
4128 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4129 )];
4130
4131 let join_types = vec![
4132 JoinType::Inner,
4133 JoinType::Left,
4134 JoinType::Right,
4135 JoinType::Full,
4136 JoinType::RightSemi,
4137 JoinType::RightAnti,
4138 JoinType::LeftSemi,
4139 JoinType::LeftAnti,
4140 ];
4141 let expected_resultset_records = 20;
4142 let common_result = [
4143 "+----+----+----+----+----+----+",
4144 "| a1 | b1 | c1 | a2 | b2 | c2 |",
4145 "+----+----+----+----+----+----+",
4146 "| 1 | 1 | 0 | 10 | 1 | 0 |",
4147 "| 2 | 1 | 0 | 10 | 1 | 0 |",
4148 "| 3 | 1 | 0 | 10 | 1 | 0 |",
4149 "| 4 | 1 | 0 | 10 | 1 | 0 |",
4150 "| 1 | 1 | 0 | 20 | 1 | 0 |",
4151 "| 2 | 1 | 0 | 20 | 1 | 0 |",
4152 "| 3 | 1 | 0 | 20 | 1 | 0 |",
4153 "| 4 | 1 | 0 | 20 | 1 | 0 |",
4154 "| 1 | 1 | 0 | 30 | 1 | 0 |",
4155 "| 2 | 1 | 0 | 30 | 1 | 0 |",
4156 "| 3 | 1 | 0 | 30 | 1 | 0 |",
4157 "| 4 | 1 | 0 | 30 | 1 | 0 |",
4158 "| 1 | 1 | 0 | 40 | 1 | 0 |",
4159 "| 2 | 1 | 0 | 40 | 1 | 0 |",
4160 "| 3 | 1 | 0 | 40 | 1 | 0 |",
4161 "| 4 | 1 | 0 | 40 | 1 | 0 |",
4162 "| 1 | 1 | 0 | 50 | 1 | 0 |",
4163 "| 2 | 1 | 0 | 50 | 1 | 0 |",
4164 "| 3 | 1 | 0 | 50 | 1 | 0 |",
4165 "| 4 | 1 | 0 | 50 | 1 | 0 |",
4166 "+----+----+----+----+----+----+",
4167 ];
4168 let left_batch = [
4169 "+----+----+----+",
4170 "| a1 | b1 | c1 |",
4171 "+----+----+----+",
4172 "| 1 | 1 | 0 |",
4173 "| 2 | 1 | 0 |",
4174 "| 3 | 1 | 0 |",
4175 "| 4 | 1 | 0 |",
4176 "+----+----+----+",
4177 ];
4178 let right_batch = [
4179 "+----+----+----+",
4180 "| a2 | b2 | c2 |",
4181 "+----+----+----+",
4182 "| 10 | 1 | 0 |",
4183 "| 20 | 1 | 0 |",
4184 "| 30 | 1 | 0 |",
4185 "| 40 | 1 | 0 |",
4186 "| 50 | 1 | 0 |",
4187 "+----+----+----+",
4188 ];
4189 let right_empty = [
4190 "+----+----+----+",
4191 "| a2 | b2 | c2 |",
4192 "+----+----+----+",
4193 "+----+----+----+",
4194 ];
4195 let left_empty = [
4196 "+----+----+----+",
4197 "| a1 | b1 | c1 |",
4198 "+----+----+----+",
4199 "+----+----+----+",
4200 ];
4201
4202 for join_type in join_types {
4204 for batch_size in (1..21).rev() {
4205 let task_ctx = prepare_task_ctx(batch_size);
4206
4207 let join = join(
4208 Arc::clone(&left),
4209 Arc::clone(&right),
4210 on.clone(),
4211 &join_type,
4212 NullEquality::NullEqualsNothing,
4213 )
4214 .unwrap();
4215
4216 let stream = join.execute(0, task_ctx).unwrap();
4217 let batches = common::collect(stream).await.unwrap();
4218
4219 let expected_batch_count = match join_type {
4224 JoinType::Inner
4225 | JoinType::Right
4226 | JoinType::RightSemi
4227 | JoinType::RightAnti => {
4228 div_ceil(expected_resultset_records, batch_size)
4229 }
4230 _ => div_ceil(expected_resultset_records, batch_size) + 1,
4231 };
4232 assert_eq!(
4233 batches.len(),
4234 expected_batch_count,
4235 "expected {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}"
4236 );
4237
4238 let expected = match join_type {
4239 JoinType::RightSemi => right_batch.to_vec(),
4240 JoinType::RightAnti => right_empty.to_vec(),
4241 JoinType::LeftSemi => left_batch.to_vec(),
4242 JoinType::LeftAnti => left_empty.to_vec(),
4243 _ => common_result.to_vec(),
4244 };
4245 assert_batches_eq!(expected, &batches);
4246 }
4247 }
4248 }
4249
4250 #[tokio::test]
4251 async fn single_partition_join_overallocation() -> Result<()> {
4252 let left = build_table(
4253 ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4254 ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4255 ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4256 );
4257 let right = build_table(
4258 ("a2", &vec![10, 11]),
4259 ("b2", &vec![12, 13]),
4260 ("c2", &vec![14, 15]),
4261 );
4262 let on = vec![(
4263 Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
4264 Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
4265 )];
4266
4267 let join_types = vec![
4268 JoinType::Inner,
4269 JoinType::Left,
4270 JoinType::Right,
4271 JoinType::Full,
4272 JoinType::LeftSemi,
4273 JoinType::LeftAnti,
4274 JoinType::RightSemi,
4275 JoinType::RightAnti,
4276 JoinType::LeftMark,
4277 JoinType::RightMark,
4278 ];
4279
4280 for join_type in join_types {
4281 let runtime = RuntimeEnvBuilder::new()
4282 .with_memory_limit(100, 1.0)
4283 .build_arc()?;
4284 let task_ctx = TaskContext::default().with_runtime(runtime);
4285 let task_ctx = Arc::new(task_ctx);
4286
4287 let join = join(
4288 Arc::clone(&left),
4289 Arc::clone(&right),
4290 on.clone(),
4291 &join_type,
4292 NullEquality::NullEqualsNothing,
4293 )?;
4294
4295 let stream = join.execute(0, task_ctx)?;
4296 let err = common::collect(stream).await.unwrap_err();
4297
4298 assert_contains!(
4300 err.to_string(),
4301 "Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:\n HashJoinInput"
4302 );
4303
4304 assert_contains!(
4305 err.to_string(),
4306 "Failed to allocate additional 120.0 B for HashJoinInput"
4307 );
4308 }
4309
4310 Ok(())
4311 }
4312
4313 #[tokio::test]
4314 async fn partitioned_join_overallocation() -> Result<()> {
4315 let left_batch = build_table_i32(
4318 ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4319 ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4320 ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
4321 );
4322 let left = TestMemoryExec::try_new_exec(
4323 &[vec![left_batch.clone()], vec![left_batch.clone()]],
4324 left_batch.schema(),
4325 None,
4326 )
4327 .unwrap();
4328 let right_batch = build_table_i32(
4329 ("a2", &vec![10, 11]),
4330 ("b2", &vec![12, 13]),
4331 ("c2", &vec![14, 15]),
4332 );
4333 let right = TestMemoryExec::try_new_exec(
4334 &[vec![right_batch.clone()], vec![right_batch.clone()]],
4335 right_batch.schema(),
4336 None,
4337 )
4338 .unwrap();
4339 let on = vec![(
4340 Arc::new(Column::new_with_schema("b1", &left_batch.schema())?) as _,
4341 Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _,
4342 )];
4343
4344 let join_types = vec![
4345 JoinType::Inner,
4346 JoinType::Left,
4347 JoinType::Right,
4348 JoinType::Full,
4349 JoinType::LeftSemi,
4350 JoinType::LeftAnti,
4351 JoinType::RightSemi,
4352 JoinType::RightAnti,
4353 ];
4354
4355 for join_type in join_types {
4356 let runtime = RuntimeEnvBuilder::new()
4357 .with_memory_limit(100, 1.0)
4358 .build_arc()?;
4359 let session_config = SessionConfig::default().with_batch_size(50);
4360 let task_ctx = TaskContext::default()
4361 .with_session_config(session_config)
4362 .with_runtime(runtime);
4363 let task_ctx = Arc::new(task_ctx);
4364
4365 let join = HashJoinExec::try_new(
4366 Arc::clone(&left) as Arc<dyn ExecutionPlan>,
4367 Arc::clone(&right) as Arc<dyn ExecutionPlan>,
4368 on.clone(),
4369 None,
4370 &join_type,
4371 None,
4372 PartitionMode::Partitioned,
4373 NullEquality::NullEqualsNothing,
4374 )?;
4375
4376 let stream = join.execute(1, task_ctx)?;
4377 let err = common::collect(stream).await.unwrap_err();
4378
4379 assert_contains!(
4381 err.to_string(),
4382 "Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n HashJoinInput[1]"
4383
4384 );
4385
4386 assert_contains!(
4387 err.to_string(),
4388 "Failed to allocate additional 120.0 B for HashJoinInput[1]"
4389 );
4390 }
4391
4392 Ok(())
4393 }
4394
4395 fn build_table_struct(
4396 struct_name: &str,
4397 field_name_and_values: (&str, &Vec<Option<i32>>),
4398 nulls: Option<NullBuffer>,
4399 ) -> Arc<dyn ExecutionPlan> {
4400 let (field_name, values) = field_name_and_values;
4401 let inner_fields = vec![Field::new(field_name, DataType::Int32, true)];
4402 let schema = Schema::new(vec![Field::new(
4403 struct_name,
4404 DataType::Struct(inner_fields.clone().into()),
4405 nulls.is_some(),
4406 )]);
4407
4408 let batch = RecordBatch::try_new(
4409 Arc::new(schema),
4410 vec![Arc::new(StructArray::new(
4411 inner_fields.into(),
4412 vec![Arc::new(Int32Array::from(values.clone()))],
4413 nulls,
4414 ))],
4415 )
4416 .unwrap();
4417 let schema_ref = batch.schema();
4418 TestMemoryExec::try_new_exec(&[vec![batch]], schema_ref, None).unwrap()
4419 }
4420
4421 #[tokio::test]
4422 async fn join_on_struct() -> Result<()> {
4423 let task_ctx = Arc::new(TaskContext::default());
4424 let left =
4425 build_table_struct("n1", ("a", &vec![None, Some(1), Some(2), Some(3)]), None);
4426 let right =
4427 build_table_struct("n2", ("a", &vec![None, Some(1), Some(2), Some(4)]), None);
4428 let on = vec![(
4429 Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
4430 Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
4431 )];
4432
4433 let (columns, batches, metrics) = join_collect(
4434 left,
4435 right,
4436 on,
4437 &JoinType::Inner,
4438 NullEquality::NullEqualsNothing,
4439 task_ctx,
4440 )
4441 .await?;
4442
4443 assert_eq!(columns, vec!["n1", "n2"]);
4444
4445 allow_duplicates! {
4446 assert_snapshot!(batches_to_string(&batches), @r#"
4447 +--------+--------+
4448 | n1 | n2 |
4449 +--------+--------+
4450 | {a: } | {a: } |
4451 | {a: 1} | {a: 1} |
4452 | {a: 2} | {a: 2} |
4453 +--------+--------+
4454 "#);
4455 }
4456
4457 assert_join_metrics!(metrics, 3);
4458
4459 Ok(())
4460 }
4461
4462 #[tokio::test]
4463 async fn join_on_struct_with_nulls() -> Result<()> {
4464 let task_ctx = Arc::new(TaskContext::default());
4465 let left =
4466 build_table_struct("n1", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
4467 let right =
4468 build_table_struct("n2", ("a", &vec![None]), Some(NullBuffer::new_null(1)));
4469 let on = vec![(
4470 Arc::new(Column::new_with_schema("n1", &left.schema())?) as _,
4471 Arc::new(Column::new_with_schema("n2", &right.schema())?) as _,
4472 )];
4473
4474 let (_, batches_null_eq, metrics) = join_collect(
4475 Arc::clone(&left),
4476 Arc::clone(&right),
4477 on.clone(),
4478 &JoinType::Inner,
4479 NullEquality::NullEqualsNull,
4480 Arc::clone(&task_ctx),
4481 )
4482 .await?;
4483
4484 allow_duplicates! {
4485 assert_snapshot!(batches_to_sort_string(&batches_null_eq), @r#"
4486 +----+----+
4487 | n1 | n2 |
4488 +----+----+
4489 | | |
4490 +----+----+
4491 "#);
4492 }
4493
4494 assert_join_metrics!(metrics, 1);
4495
4496 let (_, batches_null_neq, metrics) = join_collect(
4497 left,
4498 right,
4499 on,
4500 &JoinType::Inner,
4501 NullEquality::NullEqualsNothing,
4502 task_ctx,
4503 )
4504 .await?;
4505
4506 assert_join_metrics!(metrics, 0);
4507
4508 let expected_null_neq =
4509 ["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
4510 assert_batches_eq!(expected_null_neq, &batches_null_neq);
4511
4512 Ok(())
4513 }
4514
4515 fn columns(schema: &Schema) -> Vec<String> {
4517 schema.fields().iter().map(|f| f.name().clone()).collect()
4518 }
4519}