pub struct GroupValuesColumn<const STREAMING: bool> {
schema: SchemaRef,
map: HashTable<(u64, GroupIndexView)>,
map_size: usize,
group_index_lists: Vec<Vec<usize>>,
emit_group_index_list_buffer: Vec<usize>,
vectorized_operation_buffers: VectorizedOperationBuffers,
group_values: Vec<Box<dyn GroupColumn>>,
hashes_buffer: Vec<u64>,
random_state: RandomState,
}Expand description
A GroupValues that stores multiple columns of group values,
and supports vectorized operators for them
Fields§
§schema: SchemaRefThe output schema
map: HashTable<(u64, GroupIndexView)>Logically maps group values to a group_index in
Self::group_values and in each accumulator
It is a hashtable based on hashbrown.
Key and value in the hashtable:
- The
keyishash value(u64)of thegroup value - The
valueis thegroup valueswith the samehash value
We don’t really store the actual group values in hashtable,
instead we store the group indices pointing to values in GroupValues.
And we use GroupIndexView to represent such group indices in table.
map_size: usizeThe size of map in bytes
group_index_lists: Vec<Vec<usize>>The lists for group indices with the same hash value
It is possible that hash value collision exists,
and we will chain the group indices with same hash value
The chained indices is like:
latest group index -> older group index -> even older group index -> ...
emit_group_index_list_buffer: Vec<usize>When emitting first n, we need to decrease/erase group indices in
map and group_index_lists.
This buffer is used to temporarily store the remaining group indices in
a specific list in group_index_lists.
vectorized_operation_buffers: VectorizedOperationBuffersBuffers for vectorized_append and vectorized_equal_to
group_values: Vec<Box<dyn GroupColumn>>The actual group by values, stored column-wise. Compare from
the left to right, each column is stored as GroupColumn.
Performance tests showed that this design is faster than using the
more general purpose GroupValuesRows. See the ticket for details:
https://github.com/apache/datafusion/pull/12269
hashes_buffer: Vec<u64>reused buffer to store hashes
random_state: RandomStateRandom state for creating hashes
Implementations§
Source§impl<const STREAMING: bool> GroupValuesColumn<STREAMING>
impl<const STREAMING: bool> GroupValuesColumn<STREAMING>
Sourcepub fn try_new(schema: SchemaRef) -> Result<Self>
pub fn try_new(schema: SchemaRef) -> Result<Self>
Create a new instance of GroupValuesColumn if supported for the specified schema
Sourcefn scalarized_intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
) -> Result<()>
fn scalarized_intern( &mut self, cols: &[ArrayRef], groups: &mut Vec<usize>, ) -> Result<()>
Scalarized intern
This is used only for streaming aggregation, because streaming aggregation
depends on the order between input rows and their corresponding group indices.
For example, assuming input rows in cols with 4 new rows
(not equal to exist rows in group_values, and need to create
new groups for them):
row1 (hash collision with the exist rows)
row2
row3 (hash collision with the exist rows)
row4§In scalarized_intern, their group indices will be
row1 --> 0
row2 --> 1
row3 --> 2
row4 --> 3Group indices order agrees with their input order, and the streaming aggregation
depends on this.
§However In vectorized_intern, their group indices will be
row1 --> 2
row2 --> 0
row3 --> 3
row4 --> 1Group indices order are against with their input order, and this will lead to error
in streaming aggregation.
Sourcefn vectorized_intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
) -> Result<()>
fn vectorized_intern( &mut self, cols: &[ArrayRef], groups: &mut Vec<usize>, ) -> Result<()>
Vectorized intern
This is used in non-streaming aggregation without requiring the order between
rows in cols and corresponding groups in group_values.
The vectorized approach can offer higher performance for avoiding row by row
downcast for cols and being able to implement even more optimizations(like simd).
Sourcefn collect_vectorized_process_context(
&mut self,
batch_hashes: &[u64],
groups: &mut [usize],
)
fn collect_vectorized_process_context( &mut self, batch_hashes: &[u64], groups: &mut [usize], )
Collect vectorized context by checking hash values of cols in map
- If bucket not found
- Build and insert the
new inlined group index viewand its hash value tomap - Add row index to
vectorized_append_row_indices - Set group index to row in
groups
- bucket found
- Add row index to
vectorized_equal_to_row_indices - Check if the
group index viewisinlinedornon_inlined: If it is inlined, add tovectorized_equal_to_group_indicesdirectly. Otherwise get all group indices fromgroup_index_lists, and add them.
Sourcefn vectorized_append(&mut self, cols: &[ArrayRef]) -> Result<()>
fn vectorized_append(&mut self, cols: &[ArrayRef]) -> Result<()>
Perform vectorized_append`` for rowsinvectorized_append_row_indices`
Sourcefn vectorized_equal_to(&mut self, cols: &[ArrayRef], groups: &mut [usize])
fn vectorized_equal_to(&mut self, cols: &[ArrayRef], groups: &mut [usize])
Perform vectorized_equal_to
-
Perform
vectorized_equal_toforrowsinvectorized_equal_to_group_indicesandgroup_indicesinvectorized_equal_to_group_indices. -
Check
equal_to_results:If found equal to
rows, set thegroup_indicestorowsingroups.If found not equal to
rows, just add them toscalarized_indices, and performscalarized_internfor them after. Usually, suchrowshaving same hash but different value withexists rowsare very few.
Sourcefn scalarized_intern_remaining(
&mut self,
cols: &[ArrayRef],
batch_hashes: &[u64],
groups: &mut [usize],
) -> Result<()>
fn scalarized_intern_remaining( &mut self, cols: &[ArrayRef], batch_hashes: &[u64], groups: &mut [usize], ) -> Result<()>
It is possible that some input rows have the same
hash values with the exist rows, but have the different
actual values the exists.
We can found them in vectorized_equal_to, and put them
into scalarized_indices. And for these input rows,
we will perform the scalarized_intern similar as what in
GroupValuesColumn.
This design can make the process simple and still efficient enough:
§About making the process simple
Some corner cases become really easy to solve, like following cases:
input row1 (same hash value with exist rows, but value different)
input row1
...
input row1After performing vectorized_equal_to, we will found multiple input rows
not equal to the exist rows. However such input rows are repeated, only
one new group should be create for them.
If we don’t fallback to scalarized_intern, it is really hard for us to
distinguish the such repeated rows in input rows. And if we just fallback,
it is really easy to solve, and the performance is at least not worse than origin.
§About performance
The hash collision may be not frequent, so the fallback will indeed hardly happen.
In most situations, scalarized_indices will found to be empty after finishing to
preform vectorized_equal_to.
fn scalarized_equal_to_remaining( &self, group_index_view: &GroupIndexView, cols: &[ArrayRef], row: usize, groups: &mut [usize], ) -> bool
Trait Implementations§
Source§impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING>
impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING>
Source§fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>
cols, assigning new
group ids as necessary. Read moreSource§fn size(&self) -> usize
fn size(&self) -> usize
GroupValuesSource§fn is_empty(&self) -> bool
fn is_empty(&self) -> bool
GroupValues is emptySource§fn len(&self) -> usize
fn len(&self) -> usize
GroupValuesSource§fn clear_shrink(&mut self, batch: &RecordBatch)
fn clear_shrink(&mut self, batch: &RecordBatch)
Auto Trait Implementations§
impl<const STREAMING: bool> Freeze for GroupValuesColumn<STREAMING>
impl<const STREAMING: bool> !RefUnwindSafe for GroupValuesColumn<STREAMING>
impl<const STREAMING: bool> Send for GroupValuesColumn<STREAMING>
impl<const STREAMING: bool> Sync for GroupValuesColumn<STREAMING>
impl<const STREAMING: bool> Unpin for GroupValuesColumn<STREAMING>
impl<const STREAMING: bool> !UnwindSafe for GroupValuesColumn<STREAMING>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more