GroupsAccumulatorAdapter

Struct GroupsAccumulatorAdapter 

Source
pub struct GroupsAccumulatorAdapter {
    factory: Box<dyn Fn() -> Result<Box<dyn Accumulator>> + Send>,
    states: Vec<AccumulatorState>,
    allocation_bytes: usize,
}
Expand description

An adapter that implements [GroupsAccumulator] for any [Accumulator]

While [Accumulator] are simpler to implement and can support more general calculations (like retractable window functions), they are not as fast as a specialized GroupsAccumulator. This interface bridges the gap so the group by operator only operates in terms of [Accumulator].

Internally, this adapter creates a new [Accumulator] for each group which stores the state for that group. This both requires an allocation for each Accumulator, internal indices, as well as whatever internal allocations the Accumulator itself requires.

For example, a MinAccumulator that computes the minimum string value with a ScalarValue::Utf8. That will require at least two allocations per group (one for the MinAccumulator and one for the ScalarValue::Utf8).

                      ┌─────────────────────────────────┐
                      │MinAccumulator {                 │
               ┌─────▶│ min: ScalarValue::Utf8("A")     │───────┐
               │      │}                                │       │
               │      └─────────────────────────────────┘       └───────▶   "A"
   ┌─────┐     │      ┌─────────────────────────────────┐
   │  0  │─────┘      │MinAccumulator {                 │
   ├─────┤     ┌─────▶│ min: ScalarValue::Utf8("Z")     │───────────────▶   "Z"
   │  1  │─────┘      │}                                │
   └─────┘            └─────────────────────────────────┘                   ...
     ...                 ...
   ┌─────┐            ┌────────────────────────────────┐
   │ N-2 │            │MinAccumulator {                │
   ├─────┤            │  min: ScalarValue::Utf8("A")   │────────────────▶   "A"
   │ N-1 │─────┐      │}                               │
   └─────┘     │      └────────────────────────────────┘
               │      ┌────────────────────────────────┐        ┌───────▶   "Q"
               │      │MinAccumulator {                │        │
               └─────▶│  min: ScalarValue::Utf8("Q")   │────────┘
                      │}                               │
                      └────────────────────────────────┘


 Logical group         Current Min/Max value for that group stored
    number             as a ScalarValue which points to an
                       individually allocated String

§Optimizations

The adapter minimizes the number of calls to [Accumulator::update_batch] by first collecting the input rows for each group into a contiguous array using [compute::take]

Fields§

§factory: Box<dyn Fn() -> Result<Box<dyn Accumulator>> + Send>§states: Vec<AccumulatorState>

state for each group, stored in group_index order

§allocation_bytes: usize

Current memory usage, in bytes.

Note this is incrementally updated with deltas to avoid the call to size() being a bottleneck. We saw size() being a bottleneck in earlier implementations when there were many distinct groups.

Implementations§

Source§

impl GroupsAccumulatorAdapter

Source

pub fn new<F>(factory: F) -> Self
where F: Fn() -> Result<Box<dyn Accumulator>> + Send + 'static,

Create a new adapter that will create a new [Accumulator] for each group, using the specified factory function

Source

fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()>

Ensure that self.accumulators has total_num_groups

Source

fn invoke_per_accumulator<F>( &mut self, values: &[ArrayRef], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize, f: F, ) -> Result<()>
where F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>,

invokes f(accumulator, values) for each group that has values in group_indices.

This function first reorders the input and filter so that values for each group_index are contiguous and then invokes f on the contiguous ranges, to minimize per-row overhead

┌─────────┐   ┌─────────┐   ┌ ─ ─ ─ ─ ┐                       ┌─────────┐   ┌ ─ ─ ─ ─ ┐
│ ┌─────┐ │   │ ┌─────┐ │     ┌─────┐              ┏━━━━━┓    │ ┌─────┐ │     ┌─────┐
│ │  2  │ │   │ │ 200 │ │   │ │  t  │ │            ┃  0  ┃    │ │ 200 │ │   │ │  t  │ │
│ ├─────┤ │   │ ├─────┤ │     ├─────┤              ┣━━━━━┫    │ ├─────┤ │     ├─────┤
│ │  2  │ │   │ │ 100 │ │   │ │  f  │ │            ┃  0  ┃    │ │ 300 │ │   │ │  t  │ │
│ ├─────┤ │   │ ├─────┤ │     ├─────┤              ┣━━━━━┫    │ ├─────┤ │     ├─────┤
│ │  0  │ │   │ │ 200 │ │   │ │  t  │ │            ┃  1  ┃    │ │ 200 │ │   │ │NULL │ │
│ ├─────┤ │   │ ├─────┤ │     ├─────┤   ────────▶  ┣━━━━━┫    │ ├─────┤ │     ├─────┤
│ │  1  │ │   │ │ 200 │ │   │ │NULL │ │            ┃  2  ┃    │ │ 200 │ │   │ │  t  │ │
│ ├─────┤ │   │ ├─────┤ │     ├─────┤              ┣━━━━━┫    │ ├─────┤ │     ├─────┤
│ │  0  │ │   │ │ 300 │ │   │ │  t  │ │            ┃  2  ┃    │ │ 100 │ │   │ │  f  │ │
│ └─────┘ │   │ └─────┘ │     └─────┘              ┗━━━━━┛    │ └─────┘ │     └─────┘
└─────────┘   └─────────┘   └ ─ ─ ─ ─ ┘                       └─────────┘   └ ─ ─ ─ ─ ┘

logical group   values      opt_filter           logical group  values       opt_filter
Source

fn add_allocation(&mut self, size: usize)

Increment the allocation by n

See Self::allocation_bytes for rationale.

Source

fn free_allocation(&mut self, size: usize)

Decrease the allocation by n

See Self::allocation_bytes for rationale.

Source

fn adjust_allocation(&mut self, old_size: usize, new_size: usize)

Adjusts the allocation for something that started with start_size and now has new_size avoiding overflow

See Self::allocation_bytes for rationale.

Trait Implementations§

Source§

impl GroupsAccumulator for GroupsAccumulatorAdapter

Source§

fn update_batch( &mut self, values: &[ArrayRef], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()>

Updates the accumulator’s state from its arguments, encoded as a vector of [ArrayRef]s. Read more
Source§

fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>

Returns the final aggregate value for each group as a single RecordBatch, resetting the internal state. Read more
Source§

fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>

Returns the intermediate aggregate state for this accumulator, used for multi-phase grouping, resetting its internal state. Read more
Source§

fn merge_batch( &mut self, values: &[ArrayRef], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()>

Merges intermediate state (the output from [Self::state]) into this accumulator’s current state. Read more
Source§

fn size(&self) -> usize

Amount of memory used to store the state of this accumulator, in bytes. Read more
Source§

fn convert_to_state( &self, values: &[ArrayRef], opt_filter: Option<&BooleanArray>, ) -> Result<Vec<ArrayRef>>

Converts an input batch directly to the intermediate aggregate state. Read more
Source§

fn supports_convert_to_state(&self) -> bool

Returns true if [Self::convert_to_state] is implemented to support intermediate aggregate state conversion.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> ErasedDestructor for T
where T: 'static,