SharedBoundsAccumulator

Struct SharedBoundsAccumulator 

Source
pub(crate) struct SharedBoundsAccumulator {
    inner: Mutex<SharedBoundsState>,
    barrier: Barrier,
    dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
    on_right: Vec<PhysicalExprRef>,
}
Expand description

Coordinates dynamic filter bounds collection across multiple partitions

This structure ensures that dynamic filters are built with complete information from all relevant partitions before being applied to probe-side scans. Incomplete filters would incorrectly eliminate valid join results.

§Synchronization Strategy

  1. Each partition computes bounds from its build-side data
  2. Bounds are stored in the shared vector
  3. A barrier tracks how many partitions have reported their bounds
  4. When the last partition reports, bounds are merged and the filter is updated exactly once

§Partition Counting

The total_partitions count represents how many times collect_build_side will be called:

  • CollectLeft: Number of output partitions (each accesses shared build data)
  • Partitioned: Number of input partitions (each builds independently)

§Thread Safety

All fields use a single mutex to ensure correct coordination between concurrent partition executions.

Fields§

§inner: Mutex<SharedBoundsState>

Shared state protected by a single mutex to avoid ordering concerns

§barrier: Barrier§dynamic_filter: Arc<DynamicFilterPhysicalExpr>

Dynamic filter for pushdown to probe side

§on_right: Vec<PhysicalExprRef>

Right side join expressions needed for creating filter bounds

Implementations§

Source§

impl SharedBoundsAccumulator

Source

pub(crate) fn new_from_partition_mode( partition_mode: PartitionMode, left_child: &dyn ExecutionPlan, right_child: &dyn ExecutionPlan, dynamic_filter: Arc<DynamicFilterPhysicalExpr>, on_right: Vec<PhysicalExprRef>, ) -> Self

Creates a new SharedBoundsAccumulator configured for the given partition mode

This method calculates how many times collect_build_side will be called based on the partition mode’s execution pattern. This count is critical for determining when we have complete information from all partitions to build the dynamic filter.

§Partition Mode Execution Patterns
  • CollectLeft: Build side is collected ONCE from partition 0 and shared via OnceFut across all output partitions. Each output partition calls collect_build_side to access the shared build data. Although this results in multiple invocations, the report_partition_bounds function contains deduplication logic to handle them safely. Expected calls = number of output partitions.

  • Partitioned: Each partition independently builds its own hash table by calling collect_build_side once. Expected calls = number of build partitions.

  • Auto: Placeholder mode resolved during optimization. Uses 1 as safe default since the actual mode will be determined and a new bounds_accumulator created before execution.

§Why This Matters

We cannot build a partial filter from some partitions - it would incorrectly eliminate valid join results. We must wait until we have complete bounds information from ALL relevant partitions before updating the dynamic filter.

Source

pub(crate) fn create_filter_from_partition_bounds( &self, bounds: &[PartitionBounds], ) -> Result<Arc<dyn PhysicalExpr>>

Create a filter expression from individual partition bounds using OR logic.

This creates a filter where each partition’s bounds form a conjunction (AND) of column range predicates, and all partitions are combined with OR.

For example, with 2 partitions and 2 columns: ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= p0_max1) OR (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= p1_max1))

Source

pub(crate) async fn report_partition_bounds( &self, left_side_partition_id: usize, partition_bounds: Option<Vec<ColumnBounds>>, ) -> Result<()>

Report bounds from a completed partition and update dynamic filter if all partitions are done

This method coordinates the dynamic filter updates across all partitions. It stores the bounds from the current partition, increments the completion counter, and when all partitions have reported, creates an OR’d filter from individual partition bounds.

This method is async and uses a [tokio::sync::Barrier] to wait for all partitions to report their bounds. Once that occurs, the method will resolve for all callers and the dynamic filter will be updated exactly once.

§Note

As barriers are reusable, it is likely an error to call this method more times than the total number of partitions - as it can lead to pending futures that never resolve. We rely on correct usage from the caller rather than imposing additional checks here. If this is a concern, consider making the resulting future shared so the ready result can be reused.

§Arguments
  • left_side_partition_id - The identifier for the left-side partition reporting its bounds
  • partition_bounds - The bounds computed by this partition (if any)
§Returns
  • Result<()> - Ok if successful, Err if filter update failed

Trait Implementations§

Source§

impl Debug for SharedBoundsAccumulator

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> ErasedDestructor for T
where T: 'static,