pub struct GroupsAccumulatorAdapter {
factory: Box<dyn Fn() -> Result<Box<dyn Accumulator>> + Send>,
states: Vec<AccumulatorState>,
allocation_bytes: usize,
}Expand description
An adapter that implements [GroupsAccumulator] for any [Accumulator]
While [Accumulator] are simpler to implement and can support
more general calculations (like retractable window functions),
they are not as fast as a specialized GroupsAccumulator. This
interface bridges the gap so the group by operator only operates
in terms of [Accumulator].
Internally, this adapter creates a new [Accumulator] for each group which
stores the state for that group. This both requires an allocation for each
Accumulator, internal indices, as well as whatever internal allocations the
Accumulator itself requires.
For example, a MinAccumulator that computes the minimum string value with
a ScalarValue::Utf8. That will require at least two allocations per group
(one for the MinAccumulator and one for the ScalarValue::Utf8).
┌─────────────────────────────────┐
│MinAccumulator { │
┌─────▶│ min: ScalarValue::Utf8("A") │───────┐
│ │} │ │
│ └─────────────────────────────────┘ └───────▶ "A"
┌─────┐ │ ┌─────────────────────────────────┐
│ 0 │─────┘ │MinAccumulator { │
├─────┤ ┌─────▶│ min: ScalarValue::Utf8("Z") │───────────────▶ "Z"
│ 1 │─────┘ │} │
└─────┘ └─────────────────────────────────┘ ...
... ...
┌─────┐ ┌────────────────────────────────┐
│ N-2 │ │MinAccumulator { │
├─────┤ │ min: ScalarValue::Utf8("A") │────────────────▶ "A"
│ N-1 │─────┐ │} │
└─────┘ │ └────────────────────────────────┘
│ ┌────────────────────────────────┐ ┌───────▶ "Q"
│ │MinAccumulator { │ │
└─────▶│ min: ScalarValue::Utf8("Q") │────────┘
│} │
└────────────────────────────────┘
Logical group Current Min/Max value for that group stored
number as a ScalarValue which points to an
individually allocated String§Optimizations
The adapter minimizes the number of calls to [Accumulator::update_batch]
by first collecting the input rows for each group into a contiguous array
using [compute::take]
Fields§
§factory: Box<dyn Fn() -> Result<Box<dyn Accumulator>> + Send>§states: Vec<AccumulatorState>state for each group, stored in group_index order
allocation_bytes: usizeCurrent memory usage, in bytes.
Note this is incrementally updated with deltas to avoid the call to size() being a bottleneck. We saw size() being a bottleneck in earlier implementations when there were many distinct groups.
Implementations§
Source§impl GroupsAccumulatorAdapter
impl GroupsAccumulatorAdapter
Sourcepub fn new<F>(factory: F) -> Self
pub fn new<F>(factory: F) -> Self
Create a new adapter that will create a new [Accumulator]
for each group, using the specified factory function
Sourcefn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()>
fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()>
Ensure that self.accumulators has total_num_groups
Sourcefn invoke_per_accumulator<F>(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
f: F,
) -> Result<()>
fn invoke_per_accumulator<F>( &mut self, values: &[ArrayRef], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize, f: F, ) -> Result<()>
invokes f(accumulator, values) for each group that has values in group_indices.
This function first reorders the input and filter so that values for each group_index are contiguous and then invokes f on the contiguous ranges, to minimize per-row overhead
┌─────────┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐
│ ┌─────┐ │ │ ┌─────┐ │ ┌─────┐ ┏━━━━━┓ │ ┌─────┐ │ ┌─────┐
│ │ 2 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 0 ┃ │ │ 200 │ │ │ │ t │ │
│ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
│ │ 2 │ │ │ │ 100 │ │ │ │ f │ │ ┃ 0 ┃ │ │ 300 │ │ │ │ t │ │
│ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
│ │ 0 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 1 ┃ │ │ 200 │ │ │ │NULL │ │
│ ├─────┤ │ │ ├─────┤ │ ├─────┤ ────────▶ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
│ │ 1 │ │ │ │ 200 │ │ │ │NULL │ │ ┃ 2 ┃ │ │ 200 │ │ │ │ t │ │
│ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
│ │ 0 │ │ │ │ 300 │ │ │ │ t │ │ ┃ 2 ┃ │ │ 100 │ │ │ │ f │ │
│ └─────┘ │ │ └─────┘ │ └─────┘ ┗━━━━━┛ │ └─────┘ │ └─────┘
└─────────┘ └─────────┘ └ ─ ─ ─ ─ ┘ └─────────┘ └ ─ ─ ─ ─ ┘
logical group values opt_filter logical group values opt_filterSourcefn add_allocation(&mut self, size: usize)
fn add_allocation(&mut self, size: usize)
Increment the allocation by n
See Self::allocation_bytes for rationale.
Sourcefn free_allocation(&mut self, size: usize)
fn free_allocation(&mut self, size: usize)
Decrease the allocation by n
See Self::allocation_bytes for rationale.
Sourcefn adjust_allocation(&mut self, old_size: usize, new_size: usize)
fn adjust_allocation(&mut self, old_size: usize, new_size: usize)
Adjusts the allocation for something that started with start_size and now has new_size avoiding overflow
See Self::allocation_bytes for rationale.
Trait Implementations§
Source§impl GroupsAccumulator for GroupsAccumulatorAdapter
impl GroupsAccumulator for GroupsAccumulatorAdapter
Source§fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()>
fn update_batch( &mut self, values: &[ArrayRef], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()>
ArrayRef]s. Read moreSource§fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>
RecordBatch, resetting the internal state. Read moreSource§fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>
Source§fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()>
fn merge_batch( &mut self, values: &[ArrayRef], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()>
Self::state])
into this accumulator’s current state. Read moreSource§fn size(&self) -> usize
fn size(&self) -> usize
Source§fn convert_to_state(
&self,
values: &[ArrayRef],
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>>
fn convert_to_state( &self, values: &[ArrayRef], opt_filter: Option<&BooleanArray>, ) -> Result<Vec<ArrayRef>>
Source§fn supports_convert_to_state(&self) -> bool
fn supports_convert_to_state(&self) -> bool
true if [Self::convert_to_state] is implemented to support
intermediate aggregate state conversion.Auto Trait Implementations§
impl Freeze for GroupsAccumulatorAdapter
impl !RefUnwindSafe for GroupsAccumulatorAdapter
impl Send for GroupsAccumulatorAdapter
impl !Sync for GroupsAccumulatorAdapter
impl Unpin for GroupsAccumulatorAdapter
impl !UnwindSafe for GroupsAccumulatorAdapter
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more