pub(crate) struct SharedBoundsAccumulator {
inner: Mutex<SharedBoundsState>,
barrier: Barrier,
dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
on_right: Vec<PhysicalExprRef>,
}Expand description
Coordinates dynamic filter bounds collection across multiple partitions
This structure ensures that dynamic filters are built with complete information from all relevant partitions before being applied to probe-side scans. Incomplete filters would incorrectly eliminate valid join results.
§Synchronization Strategy
- Each partition computes bounds from its build-side data
- Bounds are stored in the shared vector
- A barrier tracks how many partitions have reported their bounds
- When the last partition reports, bounds are merged and the filter is updated exactly once
§Partition Counting
The total_partitions count represents how many times collect_build_side will be called:
- CollectLeft: Number of output partitions (each accesses shared build data)
- Partitioned: Number of input partitions (each builds independently)
§Thread Safety
All fields use a single mutex to ensure correct coordination between concurrent partition executions.
Fields§
§inner: Mutex<SharedBoundsState>Shared state protected by a single mutex to avoid ordering concerns
barrier: Barrier§dynamic_filter: Arc<DynamicFilterPhysicalExpr>Dynamic filter for pushdown to probe side
on_right: Vec<PhysicalExprRef>Right side join expressions needed for creating filter bounds
Implementations§
Sourcepub(crate) fn new_from_partition_mode(
partition_mode: PartitionMode,
left_child: &dyn ExecutionPlan,
right_child: &dyn ExecutionPlan,
dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
on_right: Vec<PhysicalExprRef>,
) -> Self
pub(crate) fn new_from_partition_mode( partition_mode: PartitionMode, left_child: &dyn ExecutionPlan, right_child: &dyn ExecutionPlan, dynamic_filter: Arc<DynamicFilterPhysicalExpr>, on_right: Vec<PhysicalExprRef>, ) -> Self
Creates a new SharedBoundsAccumulator configured for the given partition mode
This method calculates how many times collect_build_side will be called based on the
partition mode’s execution pattern. This count is critical for determining when we have
complete information from all partitions to build the dynamic filter.
§Partition Mode Execution Patterns
-
CollectLeft: Build side is collected ONCE from partition 0 and shared via
OnceFutacross all output partitions. Each output partition callscollect_build_sideto access the shared build data. Although this results in multiple invocations, thereport_partition_boundsfunction contains deduplication logic to handle them safely. Expected calls = number of output partitions. -
Partitioned: Each partition independently builds its own hash table by calling
collect_build_sideonce. Expected calls = number of build partitions. -
Auto: Placeholder mode resolved during optimization. Uses 1 as safe default since the actual mode will be determined and a new bounds_accumulator created before execution.
§Why This Matters
We cannot build a partial filter from some partitions - it would incorrectly eliminate valid join results. We must wait until we have complete bounds information from ALL relevant partitions before updating the dynamic filter.
Sourcepub(crate) fn create_filter_from_partition_bounds(
&self,
bounds: &[PartitionBounds],
) -> Result<Arc<dyn PhysicalExpr>>
pub(crate) fn create_filter_from_partition_bounds( &self, bounds: &[PartitionBounds], ) -> Result<Arc<dyn PhysicalExpr>>
Create a filter expression from individual partition bounds using OR logic.
This creates a filter where each partition’s bounds form a conjunction (AND) of column range predicates, and all partitions are combined with OR.
For example, with 2 partitions and 2 columns: ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= p0_max1) OR (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= p1_max1))
Sourcepub(crate) async fn report_partition_bounds(
&self,
left_side_partition_id: usize,
partition_bounds: Option<Vec<ColumnBounds>>,
) -> Result<()>
pub(crate) async fn report_partition_bounds( &self, left_side_partition_id: usize, partition_bounds: Option<Vec<ColumnBounds>>, ) -> Result<()>
Report bounds from a completed partition and update dynamic filter if all partitions are done
This method coordinates the dynamic filter updates across all partitions. It stores the bounds from the current partition, increments the completion counter, and when all partitions have reported, creates an OR’d filter from individual partition bounds.
This method is async and uses a [tokio::sync::Barrier] to wait for all partitions
to report their bounds. Once that occurs, the method will resolve for all callers and the
dynamic filter will be updated exactly once.
§Note
As barriers are reusable, it is likely an error to call this method more times than the total number of partitions - as it can lead to pending futures that never resolve. We rely on correct usage from the caller rather than imposing additional checks here. If this is a concern, consider making the resulting future shared so the ready result can be reused.
§Arguments
left_side_partition_id- The identifier for the left-side partition reporting its boundspartition_bounds- The bounds computed by this partition (if any)
§Returns
Result<()>- Ok if successful, Err if filter update failed
Trait Implementations§
Auto Trait Implementations§
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more