GroupValuesColumn

Struct GroupValuesColumn 

Source
pub struct GroupValuesColumn<const STREAMING: bool> {
    schema: SchemaRef,
    map: HashTable<(u64, GroupIndexView)>,
    map_size: usize,
    group_index_lists: Vec<Vec<usize>>,
    emit_group_index_list_buffer: Vec<usize>,
    vectorized_operation_buffers: VectorizedOperationBuffers,
    group_values: Vec<Box<dyn GroupColumn>>,
    hashes_buffer: Vec<u64>,
    random_state: RandomState,
}
Expand description

A GroupValues that stores multiple columns of group values, and supports vectorized operators for them

Fields§

§schema: SchemaRef

The output schema

§map: HashTable<(u64, GroupIndexView)>

Logically maps group values to a group_index in Self::group_values and in each accumulator

It is a hashtable based on hashbrown.

Key and value in the hashtable:

  • The key is hash value(u64) of the group value
  • The value is the group values with the same hash value

We don’t really store the actual group values in hashtable, instead we store the group indices pointing to values in GroupValues. And we use GroupIndexView to represent such group indices in table.

§map_size: usize

The size of map in bytes

§group_index_lists: Vec<Vec<usize>>

The lists for group indices with the same hash value

It is possible that hash value collision exists, and we will chain the group indices with same hash value

The chained indices is like: latest group index -> older group index -> even older group index -> ...

§emit_group_index_list_buffer: Vec<usize>

When emitting first n, we need to decrease/erase group indices in map and group_index_lists.

This buffer is used to temporarily store the remaining group indices in a specific list in group_index_lists.

§vectorized_operation_buffers: VectorizedOperationBuffers

Buffers for vectorized_append and vectorized_equal_to

§group_values: Vec<Box<dyn GroupColumn>>

The actual group by values, stored column-wise. Compare from the left to right, each column is stored as GroupColumn.

Performance tests showed that this design is faster than using the more general purpose GroupValuesRows. See the ticket for details: https://github.com/apache/datafusion/pull/12269

§hashes_buffer: Vec<u64>

reused buffer to store hashes

§random_state: RandomState

Random state for creating hashes

Implementations§

Source§

impl<const STREAMING: bool> GroupValuesColumn<STREAMING>

Source

pub fn try_new(schema: SchemaRef) -> Result<Self>

Create a new instance of GroupValuesColumn if supported for the specified schema

Source

fn scalarized_intern( &mut self, cols: &[ArrayRef], groups: &mut Vec<usize>, ) -> Result<()>

Scalarized intern

This is used only for streaming aggregation, because streaming aggregation depends on the order between input rows and their corresponding group indices.

For example, assuming input rows in cols with 4 new rows (not equal to exist rows in group_values, and need to create new groups for them):

  row1 (hash collision with the exist rows)
  row2
  row3 (hash collision with the exist rows)
  row4
§In scalarized_intern, their group indices will be
  row1 --> 0
  row2 --> 1
  row3 --> 2
  row4 --> 3

Group indices order agrees with their input order, and the streaming aggregation depends on this.

§However In vectorized_intern, their group indices will be
  row1 --> 2
  row2 --> 0
  row3 --> 3
  row4 --> 1

Group indices order are against with their input order, and this will lead to error in streaming aggregation.

Source

fn vectorized_intern( &mut self, cols: &[ArrayRef], groups: &mut Vec<usize>, ) -> Result<()>

Vectorized intern

This is used in non-streaming aggregation without requiring the order between rows in cols and corresponding groups in group_values.

The vectorized approach can offer higher performance for avoiding row by row downcast for cols and being able to implement even more optimizations(like simd).

Source

fn collect_vectorized_process_context( &mut self, batch_hashes: &[u64], groups: &mut [usize], )

Collect vectorized context by checking hash values of cols in map

  1. If bucket not found
  • Build and insert the new inlined group index view and its hash value to map
  • Add row index to vectorized_append_row_indices
  • Set group index to row in groups
  1. bucket found
  • Add row index to vectorized_equal_to_row_indices
  • Check if the group index view is inlined or non_inlined: If it is inlined, add to vectorized_equal_to_group_indices directly. Otherwise get all group indices from group_index_lists, and add them.
Source

fn vectorized_append(&mut self, cols: &[ArrayRef]) -> Result<()>

Perform vectorized_append`` for rowsinvectorized_append_row_indices`

Source

fn vectorized_equal_to(&mut self, cols: &[ArrayRef], groups: &mut [usize])

Perform vectorized_equal_to

  1. Perform vectorized_equal_to for rows in vectorized_equal_to_group_indices and group_indices in vectorized_equal_to_group_indices.

  2. Check equal_to_results:

    If found equal to rows, set the group_indices to rows in groups.

    If found not equal to rows, just add them to scalarized_indices, and perform scalarized_intern for them after. Usually, such rows having same hash but different value with exists rows are very few.

Source

fn scalarized_intern_remaining( &mut self, cols: &[ArrayRef], batch_hashes: &[u64], groups: &mut [usize], ) -> Result<()>

It is possible that some input rows have the same hash values with the exist rows, but have the different actual values the exists.

We can found them in vectorized_equal_to, and put them into scalarized_indices. And for these input rows, we will perform the scalarized_intern similar as what in GroupValuesColumn.

This design can make the process simple and still efficient enough:

§About making the process simple

Some corner cases become really easy to solve, like following cases:

  input row1 (same hash value with exist rows, but value different)
  input row1
  ...
  input row1

After performing vectorized_equal_to, we will found multiple input rows not equal to the exist rows. However such input rows are repeated, only one new group should be create for them.

If we don’t fallback to scalarized_intern, it is really hard for us to distinguish the such repeated rows in input rows. And if we just fallback, it is really easy to solve, and the performance is at least not worse than origin.

§About performance

The hash collision may be not frequent, so the fallback will indeed hardly happen. In most situations, scalarized_indices will found to be empty after finishing to preform vectorized_equal_to.

Source

fn scalarized_equal_to_remaining( &self, group_index_view: &GroupIndexView, cols: &[ArrayRef], row: usize, groups: &mut [usize], ) -> bool

Trait Implementations§

Source§

impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING>

Source§

fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>

Calculates the group id for each input row of cols, assigning new group ids as necessary. Read more
Source§

fn size(&self) -> usize

Returns the number of bytes of memory used by this GroupValues
Source§

fn is_empty(&self) -> bool

Returns true if this GroupValues is empty
Source§

fn len(&self) -> usize

The number of values (distinct group values) stored in this GroupValues
Source§

fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>

Emits the group values
Source§

fn clear_shrink(&mut self, batch: &RecordBatch)

Clear the contents and shrink the capacity to the size of the batch (free up memory usage)

Auto Trait Implementations§

§

impl<const STREAMING: bool> Freeze for GroupValuesColumn<STREAMING>

§

impl<const STREAMING: bool> !RefUnwindSafe for GroupValuesColumn<STREAMING>

§

impl<const STREAMING: bool> Send for GroupValuesColumn<STREAMING>

§

impl<const STREAMING: bool> Sync for GroupValuesColumn<STREAMING>

§

impl<const STREAMING: bool> Unpin for GroupValuesColumn<STREAMING>

§

impl<const STREAMING: bool> !UnwindSafe for GroupValuesColumn<STREAMING>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> ErasedDestructor for T
where T: 'static,