datafusion_physical_plan/aggregates/group_values/multi_group_by/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `GroupValues` implementations for multi group by cases
19
20mod boolean;
21mod bytes;
22pub mod bytes_view;
23pub mod primitive;
24
25use std::mem::{self, size_of};
26
27use crate::aggregates::group_values::multi_group_by::{
28 boolean::BooleanGroupValueBuilder, bytes::ByteGroupValueBuilder,
29 bytes_view::ByteViewGroupValueBuilder, primitive::PrimitiveGroupValueBuilder,
30};
31use crate::aggregates::group_values::GroupValues;
32use ahash::RandomState;
33use arrow::array::{Array, ArrayRef, RecordBatch};
34use arrow::compute::cast;
35use arrow::datatypes::{
36 BinaryViewType, DataType, Date32Type, Date64Type, Decimal128Type, Float32Type,
37 Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, Schema, SchemaRef,
38 StringViewType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
39 Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
40 TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type,
41 UInt8Type,
42};
43use datafusion_common::hash_utils::create_hashes;
44use datafusion_common::{internal_datafusion_err, not_impl_err, Result};
45use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt};
46use datafusion_expr::EmitTo;
47use datafusion_physical_expr::binary_map::OutputType;
48
49use hashbrown::hash_table::HashTable;
50
51const NON_INLINED_FLAG: u64 = 0x8000000000000000;
52const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF;
53
54/// Trait for storing a single column of group values in [`GroupValuesColumn`]
55///
56/// Implementations of this trait store an in-progress collection of group values
57/// (similar to various builders in Arrow-rs) that allow for quick comparison to
58/// incoming rows.
59///
60/// [`GroupValuesColumn`]: crate::aggregates::group_values::GroupValuesColumn
61pub trait GroupColumn: Send + Sync {
62 /// Returns equal if the row stored in this builder at `lhs_row` is equal to
63 /// the row in `array` at `rhs_row`
64 ///
65 /// Note that this comparison returns true if both elements are NULL
66 fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool;
67
68 /// Appends the row at `row` in `array` to this builder
69 fn append_val(&mut self, array: &ArrayRef, row: usize) -> Result<()>;
70
71 /// The vectorized version equal to
72 ///
73 /// When found nth row stored in this builder at `lhs_row`
74 /// is equal to the row in `array` at `rhs_row`,
75 /// it will record the `true` result at the corresponding
76 /// position in `equal_to_results`.
77 ///
78 /// And if found nth result in `equal_to_results` is already
79 /// `false`, the check for nth row will be skipped.
80 fn vectorized_equal_to(
81 &self,
82 lhs_rows: &[usize],
83 array: &ArrayRef,
84 rhs_rows: &[usize],
85 equal_to_results: &mut [bool],
86 );
87
88 /// The vectorized version `append_val`
89 fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()>;
90
91 /// Returns the number of rows stored in this builder
92 fn len(&self) -> usize;
93
94 /// true if len == 0
95 fn is_empty(&self) -> bool {
96 self.len() == 0
97 }
98
99 /// Returns the number of bytes used by this [`GroupColumn`]
100 fn size(&self) -> usize;
101
102 /// Builds a new array from all of the stored rows
103 fn build(self: Box<Self>) -> ArrayRef;
104
105 /// Builds a new array from the first `n` stored rows, shifting the
106 /// remaining rows to the start of the builder
107 fn take_n(&mut self, n: usize) -> ArrayRef;
108}
109
110/// Determines if the nullability of the existing and new input array can be used
111/// to short-circuit the comparison of the two values.
112///
113/// Returns `Some(result)` if the result of the comparison can be determined
114/// from the nullness of the two values, and `None` if the comparison must be
115/// done on the values themselves.
116pub fn nulls_equal_to(lhs_null: bool, rhs_null: bool) -> Option<bool> {
117 match (lhs_null, rhs_null) {
118 (true, true) => Some(true),
119 (false, true) | (true, false) => Some(false),
120 _ => None,
121 }
122}
123
124/// The view of indices pointing to the actual values in `GroupValues`
125///
126/// If only single `group index` represented by view,
127/// value of view is just the `group index`, and we call it a `inlined view`.
128///
129/// If multiple `group indices` represented by view,
130/// value of view is the actually the index pointing to `group indices`,
131/// and we call it `non-inlined view`.
132///
133/// The view(a u64) format is like:
134/// +---------------------+---------------------------------------------+
135/// | inlined flag(1bit) | group index / index to group indices(63bit) |
136/// +---------------------+---------------------------------------------+
137///
138/// `inlined flag`: 1 represents `non-inlined`, and 0 represents `inlined`
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
140struct GroupIndexView(u64);
141
142impl GroupIndexView {
143 #[inline]
144 pub fn is_non_inlined(&self) -> bool {
145 (self.0 & NON_INLINED_FLAG) > 0
146 }
147
148 #[inline]
149 pub fn new_inlined(group_index: u64) -> Self {
150 Self(group_index)
151 }
152
153 #[inline]
154 pub fn new_non_inlined(list_offset: u64) -> Self {
155 let non_inlined_value = list_offset | NON_INLINED_FLAG;
156 Self(non_inlined_value)
157 }
158
159 #[inline]
160 pub fn value(&self) -> u64 {
161 self.0 & VALUE_MASK
162 }
163}
164
165/// A [`GroupValues`] that stores multiple columns of group values,
166/// and supports vectorized operators for them
167pub struct GroupValuesColumn<const STREAMING: bool> {
168 /// The output schema
169 schema: SchemaRef,
170
171 /// Logically maps group values to a group_index in
172 /// [`Self::group_values`] and in each accumulator
173 ///
174 /// It is a `hashtable` based on `hashbrown`.
175 ///
176 /// Key and value in the `hashtable`:
177 /// - The `key` is `hash value(u64)` of the `group value`
178 /// - The `value` is the `group values` with the same `hash value`
179 ///
180 /// We don't really store the actual `group values` in `hashtable`,
181 /// instead we store the `group indices` pointing to values in `GroupValues`.
182 /// And we use [`GroupIndexView`] to represent such `group indices` in table.
183 ///
184 map: HashTable<(u64, GroupIndexView)>,
185
186 /// The size of `map` in bytes
187 map_size: usize,
188
189 /// The lists for group indices with the same hash value
190 ///
191 /// It is possible that hash value collision exists,
192 /// and we will chain the `group indices` with same hash value
193 ///
194 /// The chained indices is like:
195 /// `latest group index -> older group index -> even older group index -> ...`
196 group_index_lists: Vec<Vec<usize>>,
197
198 /// When emitting first n, we need to decrease/erase group indices in
199 /// `map` and `group_index_lists`.
200 ///
201 /// This buffer is used to temporarily store the remaining group indices in
202 /// a specific list in `group_index_lists`.
203 emit_group_index_list_buffer: Vec<usize>,
204
205 /// Buffers for `vectorized_append` and `vectorized_equal_to`
206 vectorized_operation_buffers: VectorizedOperationBuffers,
207
208 /// The actual group by values, stored column-wise. Compare from
209 /// the left to right, each column is stored as [`GroupColumn`].
210 ///
211 /// Performance tests showed that this design is faster than using the
212 /// more general purpose [`GroupValuesRows`]. See the ticket for details:
213 /// <https://github.com/apache/datafusion/pull/12269>
214 ///
215 /// [`GroupValuesRows`]: crate::aggregates::group_values::row::GroupValuesRows
216 group_values: Vec<Box<dyn GroupColumn>>,
217
218 /// reused buffer to store hashes
219 hashes_buffer: Vec<u64>,
220
221 /// Random state for creating hashes
222 random_state: RandomState,
223}
224
225/// Buffers to store intermediate results in `vectorized_append`
226/// and `vectorized_equal_to`, for reducing memory allocation
227#[derive(Default)]
228struct VectorizedOperationBuffers {
229 /// The `vectorized append` row indices buffer
230 append_row_indices: Vec<usize>,
231
232 /// The `vectorized_equal_to` row indices buffer
233 equal_to_row_indices: Vec<usize>,
234
235 /// The `vectorized_equal_to` group indices buffer
236 equal_to_group_indices: Vec<usize>,
237
238 /// The `vectorized_equal_to` result buffer
239 equal_to_results: Vec<bool>,
240
241 /// The buffer for storing row indices found not equal to
242 /// exist groups in `group_values` in `vectorized_equal_to`.
243 /// We will perform `scalarized_intern` for such rows.
244 remaining_row_indices: Vec<usize>,
245}
246
247impl VectorizedOperationBuffers {
248 fn clear(&mut self) {
249 self.append_row_indices.clear();
250 self.equal_to_row_indices.clear();
251 self.equal_to_group_indices.clear();
252 self.equal_to_results.clear();
253 self.remaining_row_indices.clear();
254 }
255}
256
257impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
258 // ========================================================================
259 // Initialization functions
260 // ========================================================================
261
262 /// Create a new instance of GroupValuesColumn if supported for the specified schema
263 pub fn try_new(schema: SchemaRef) -> Result<Self> {
264 let map = HashTable::with_capacity(0);
265 Ok(Self {
266 schema,
267 map,
268 group_index_lists: Vec::new(),
269 emit_group_index_list_buffer: Vec::new(),
270 vectorized_operation_buffers: VectorizedOperationBuffers::default(),
271 map_size: 0,
272 group_values: vec![],
273 hashes_buffer: Default::default(),
274 random_state: crate::aggregates::AGGREGATION_HASH_SEED,
275 })
276 }
277
278 // ========================================================================
279 // Scalarized intern
280 // ========================================================================
281
282 /// Scalarized intern
283 ///
284 /// This is used only for `streaming aggregation`, because `streaming aggregation`
285 /// depends on the order between `input rows` and their corresponding `group indices`.
286 ///
287 /// For example, assuming `input rows` in `cols` with 4 new rows
288 /// (not equal to `exist rows` in `group_values`, and need to create
289 /// new groups for them):
290 ///
291 /// ```text
292 /// row1 (hash collision with the exist rows)
293 /// row2
294 /// row3 (hash collision with the exist rows)
295 /// row4
296 /// ```
297 ///
298 /// # In `scalarized_intern`, their `group indices` will be
299 ///
300 /// ```text
301 /// row1 --> 0
302 /// row2 --> 1
303 /// row3 --> 2
304 /// row4 --> 3
305 /// ```
306 ///
307 /// `Group indices` order agrees with their input order, and the `streaming aggregation`
308 /// depends on this.
309 ///
310 /// # However In `vectorized_intern`, their `group indices` will be
311 ///
312 /// ```text
313 /// row1 --> 2
314 /// row2 --> 0
315 /// row3 --> 3
316 /// row4 --> 1
317 /// ```
318 ///
319 /// `Group indices` order are against with their input order, and this will lead to error
320 /// in `streaming aggregation`.
321 fn scalarized_intern(
322 &mut self,
323 cols: &[ArrayRef],
324 groups: &mut Vec<usize>,
325 ) -> Result<()> {
326 let n_rows = cols[0].len();
327
328 // tracks to which group each of the input rows belongs
329 groups.clear();
330
331 // 1.1 Calculate the group keys for the group values
332 let batch_hashes = &mut self.hashes_buffer;
333 batch_hashes.clear();
334 batch_hashes.resize(n_rows, 0);
335 create_hashes(cols, &self.random_state, batch_hashes)?;
336
337 for (row, &target_hash) in batch_hashes.iter().enumerate() {
338 let entry = self
339 .map
340 .find_mut(target_hash, |(exist_hash, group_idx_view)| {
341 // It is ensured to be inlined in `scalarized_intern`
342 debug_assert!(!group_idx_view.is_non_inlined());
343
344 // Somewhat surprisingly, this closure can be called even if the
345 // hash doesn't match, so check the hash first with an integer
346 // comparison first avoid the more expensive comparison with
347 // group value. https://github.com/apache/datafusion/pull/11718
348 if target_hash != *exist_hash {
349 return false;
350 }
351
352 fn check_row_equal(
353 array_row: &dyn GroupColumn,
354 lhs_row: usize,
355 array: &ArrayRef,
356 rhs_row: usize,
357 ) -> bool {
358 array_row.equal_to(lhs_row, array, rhs_row)
359 }
360
361 for (i, group_val) in self.group_values.iter().enumerate() {
362 if !check_row_equal(
363 group_val.as_ref(),
364 group_idx_view.value() as usize,
365 &cols[i],
366 row,
367 ) {
368 return false;
369 }
370 }
371
372 true
373 });
374
375 let group_idx = match entry {
376 // Existing group_index for this group value
377 Some((_hash, group_idx_view)) => group_idx_view.value() as usize,
378 // 1.2 Need to create new entry for the group
379 None => {
380 // Add new entry to aggr_state and save newly created index
381 // let group_idx = group_values.num_rows();
382 // group_values.push(group_rows.row(row));
383
384 let mut checklen = 0;
385 let group_idx = self.group_values[0].len();
386 for (i, group_value) in self.group_values.iter_mut().enumerate() {
387 group_value.append_val(&cols[i], row)?;
388 let len = group_value.len();
389 if i == 0 {
390 checklen = len;
391 } else {
392 debug_assert_eq!(checklen, len);
393 }
394 }
395
396 // for hasher function, use precomputed hash value
397 self.map.insert_accounted(
398 (target_hash, GroupIndexView::new_inlined(group_idx as u64)),
399 |(hash, _group_index)| *hash,
400 &mut self.map_size,
401 );
402 group_idx
403 }
404 };
405 groups.push(group_idx);
406 }
407
408 Ok(())
409 }
410
411 // ========================================================================
412 // Vectorized intern
413 // ========================================================================
414
415 /// Vectorized intern
416 ///
417 /// This is used in `non-streaming aggregation` without requiring the order between
418 /// rows in `cols` and corresponding groups in `group_values`.
419 ///
420 /// The vectorized approach can offer higher performance for avoiding row by row
421 /// downcast for `cols` and being able to implement even more optimizations(like simd).
422 fn vectorized_intern(
423 &mut self,
424 cols: &[ArrayRef],
425 groups: &mut Vec<usize>,
426 ) -> Result<()> {
427 let n_rows = cols[0].len();
428
429 // tracks to which group each of the input rows belongs
430 groups.clear();
431 groups.resize(n_rows, usize::MAX);
432
433 let mut batch_hashes = mem::take(&mut self.hashes_buffer);
434 batch_hashes.clear();
435 batch_hashes.resize(n_rows, 0);
436 create_hashes(cols, &self.random_state, &mut batch_hashes)?;
437
438 // General steps for one round `vectorized equal_to & append`:
439 // 1. Collect vectorized context by checking hash values of `cols` in `map`,
440 // mainly fill `vectorized_append_row_indices`, `vectorized_equal_to_row_indices`
441 // and `vectorized_equal_to_group_indices`
442 //
443 // 2. Perform `vectorized_append` for `vectorized_append_row_indices`.
444 // `vectorized_append` must be performed before `vectorized_equal_to`,
445 // because some `group indices` in `vectorized_equal_to_group_indices`
446 // maybe still point to no actual values in `group_values` before performing append.
447 //
448 // 3. Perform `vectorized_equal_to` for `vectorized_equal_to_row_indices`
449 // and `vectorized_equal_to_group_indices`. If found some rows in input `cols`
450 // not equal to `exist rows` in `group_values`, place them in `remaining_row_indices`
451 // and perform `scalarized_intern_remaining` for them similar as `scalarized_intern`
452 // after.
453 //
454 // 4. Perform `scalarized_intern_remaining` for rows mentioned above, about in what situation
455 // we will process this can see the comments of `scalarized_intern_remaining`.
456 //
457
458 // 1. Collect vectorized context by checking hash values of `cols` in `map`
459 self.collect_vectorized_process_context(&batch_hashes, groups);
460
461 // 2. Perform `vectorized_append`
462 self.vectorized_append(cols)?;
463
464 // 3. Perform `vectorized_equal_to`
465 self.vectorized_equal_to(cols, groups);
466
467 // 4. Perform scalarized inter for remaining rows
468 // (about remaining rows, can see comments for `remaining_row_indices`)
469 self.scalarized_intern_remaining(cols, &batch_hashes, groups)?;
470
471 self.hashes_buffer = batch_hashes;
472
473 Ok(())
474 }
475
476 /// Collect vectorized context by checking hash values of `cols` in `map`
477 ///
478 /// 1. If bucket not found
479 /// - Build and insert the `new inlined group index view`
480 /// and its hash value to `map`
481 /// - Add row index to `vectorized_append_row_indices`
482 /// - Set group index to row in `groups`
483 ///
484 /// 2. bucket found
485 /// - Add row index to `vectorized_equal_to_row_indices`
486 /// - Check if the `group index view` is `inlined` or `non_inlined`:
487 /// If it is inlined, add to `vectorized_equal_to_group_indices` directly.
488 /// Otherwise get all group indices from `group_index_lists`, and add them.
489 fn collect_vectorized_process_context(
490 &mut self,
491 batch_hashes: &[u64],
492 groups: &mut [usize],
493 ) {
494 self.vectorized_operation_buffers.append_row_indices.clear();
495 self.vectorized_operation_buffers
496 .equal_to_row_indices
497 .clear();
498 self.vectorized_operation_buffers
499 .equal_to_group_indices
500 .clear();
501
502 let mut group_values_len = self.group_values[0].len();
503 for (row, &target_hash) in batch_hashes.iter().enumerate() {
504 let entry = self
505 .map
506 .find(target_hash, |(exist_hash, _)| target_hash == *exist_hash);
507
508 let Some((_, group_index_view)) = entry else {
509 // 1. Bucket not found case
510 // Build `new inlined group index view`
511 let current_group_idx = group_values_len;
512 let group_index_view =
513 GroupIndexView::new_inlined(current_group_idx as u64);
514
515 // Insert the `group index view` and its hash into `map`
516 // for hasher function, use precomputed hash value
517 self.map.insert_accounted(
518 (target_hash, group_index_view),
519 |(hash, _)| *hash,
520 &mut self.map_size,
521 );
522
523 // Add row index to `vectorized_append_row_indices`
524 self.vectorized_operation_buffers
525 .append_row_indices
526 .push(row);
527
528 // Set group index to row in `groups`
529 groups[row] = current_group_idx;
530
531 group_values_len += 1;
532 continue;
533 };
534
535 // 2. bucket found
536 // Check if the `group index view` is `inlined` or `non_inlined`
537 if group_index_view.is_non_inlined() {
538 // Non-inlined case, the value of view is offset in `group_index_lists`.
539 // We use it to get `group_index_list`, and add related `rows` and `group_indices`
540 // into `vectorized_equal_to_row_indices` and `vectorized_equal_to_group_indices`.
541 let list_offset = group_index_view.value() as usize;
542 let group_index_list = &self.group_index_lists[list_offset];
543 for &group_index in group_index_list {
544 self.vectorized_operation_buffers
545 .equal_to_row_indices
546 .push(row);
547 self.vectorized_operation_buffers
548 .equal_to_group_indices
549 .push(group_index);
550 }
551 } else {
552 let group_index = group_index_view.value() as usize;
553 self.vectorized_operation_buffers
554 .equal_to_row_indices
555 .push(row);
556 self.vectorized_operation_buffers
557 .equal_to_group_indices
558 .push(group_index);
559 }
560 }
561 }
562
563 /// Perform `vectorized_append`` for `rows` in `vectorized_append_row_indices`
564 fn vectorized_append(&mut self, cols: &[ArrayRef]) -> Result<()> {
565 if self
566 .vectorized_operation_buffers
567 .append_row_indices
568 .is_empty()
569 {
570 return Ok(());
571 }
572
573 let iter = self.group_values.iter_mut().zip(cols.iter());
574 for (group_column, col) in iter {
575 group_column.vectorized_append(
576 col,
577 &self.vectorized_operation_buffers.append_row_indices,
578 )?;
579 }
580
581 Ok(())
582 }
583
584 /// Perform `vectorized_equal_to`
585 ///
586 /// 1. Perform `vectorized_equal_to` for `rows` in `vectorized_equal_to_group_indices`
587 /// and `group_indices` in `vectorized_equal_to_group_indices`.
588 ///
589 /// 2. Check `equal_to_results`:
590 ///
591 /// If found equal to `rows`, set the `group_indices` to `rows` in `groups`.
592 ///
593 /// If found not equal to `row`s, just add them to `scalarized_indices`,
594 /// and perform `scalarized_intern` for them after.
595 /// Usually, such `rows` having same hash but different value with `exists rows`
596 /// are very few.
597 fn vectorized_equal_to(&mut self, cols: &[ArrayRef], groups: &mut [usize]) {
598 assert_eq!(
599 self.vectorized_operation_buffers
600 .equal_to_group_indices
601 .len(),
602 self.vectorized_operation_buffers.equal_to_row_indices.len()
603 );
604
605 self.vectorized_operation_buffers
606 .remaining_row_indices
607 .clear();
608
609 if self
610 .vectorized_operation_buffers
611 .equal_to_group_indices
612 .is_empty()
613 {
614 return;
615 }
616
617 // 1. Perform `vectorized_equal_to` for `rows` in `vectorized_equal_to_group_indices`
618 // and `group_indices` in `vectorized_equal_to_group_indices`
619 let mut equal_to_results =
620 mem::take(&mut self.vectorized_operation_buffers.equal_to_results);
621 equal_to_results.clear();
622 equal_to_results.resize(
623 self.vectorized_operation_buffers
624 .equal_to_group_indices
625 .len(),
626 true,
627 );
628
629 for (col_idx, group_col) in self.group_values.iter().enumerate() {
630 group_col.vectorized_equal_to(
631 &self.vectorized_operation_buffers.equal_to_group_indices,
632 &cols[col_idx],
633 &self.vectorized_operation_buffers.equal_to_row_indices,
634 &mut equal_to_results,
635 );
636 }
637
638 // 2. Check `equal_to_results`, if found not equal to `row`s, just add them
639 // to `scalarized_indices`, and perform `scalarized_intern` for them after.
640 let mut current_row_equal_to_result = false;
641 for (idx, &row) in self
642 .vectorized_operation_buffers
643 .equal_to_row_indices
644 .iter()
645 .enumerate()
646 {
647 let equal_to_result = equal_to_results[idx];
648
649 // Equal to case, set the `group_indices` to `rows` in `groups`
650 if equal_to_result {
651 groups[row] =
652 self.vectorized_operation_buffers.equal_to_group_indices[idx];
653 }
654 current_row_equal_to_result |= equal_to_result;
655
656 // Look forward next one row to check if have checked all results
657 // of current row
658 let next_row = self
659 .vectorized_operation_buffers
660 .equal_to_row_indices
661 .get(idx + 1)
662 .unwrap_or(&usize::MAX);
663
664 // Have checked all results of current row, check the total result
665 if row != *next_row {
666 // Not equal to case, add `row` to `scalarized_indices`
667 if !current_row_equal_to_result {
668 self.vectorized_operation_buffers
669 .remaining_row_indices
670 .push(row);
671 }
672
673 // Init the total result for checking next row
674 current_row_equal_to_result = false;
675 }
676 }
677
678 self.vectorized_operation_buffers.equal_to_results = equal_to_results;
679 }
680
681 /// It is possible that some `input rows` have the same
682 /// hash values with the `exist rows`, but have the different
683 /// actual values the exists.
684 ///
685 /// We can found them in `vectorized_equal_to`, and put them
686 /// into `scalarized_indices`. And for these `input rows`,
687 /// we will perform the `scalarized_intern` similar as what in
688 /// [`GroupValuesColumn`].
689 ///
690 /// This design can make the process simple and still efficient enough:
691 ///
692 /// # About making the process simple
693 ///
694 /// Some corner cases become really easy to solve, like following cases:
695 ///
696 /// ```text
697 /// input row1 (same hash value with exist rows, but value different)
698 /// input row1
699 /// ...
700 /// input row1
701 /// ```
702 ///
703 /// After performing `vectorized_equal_to`, we will found multiple `input rows`
704 /// not equal to the `exist rows`. However such `input rows` are repeated, only
705 /// one new group should be create for them.
706 ///
707 /// If we don't fallback to `scalarized_intern`, it is really hard for us to
708 /// distinguish the such `repeated rows` in `input rows`. And if we just fallback,
709 /// it is really easy to solve, and the performance is at least not worse than origin.
710 ///
711 /// # About performance
712 ///
713 /// The hash collision may be not frequent, so the fallback will indeed hardly happen.
714 /// In most situations, `scalarized_indices` will found to be empty after finishing to
715 /// preform `vectorized_equal_to`.
716 fn scalarized_intern_remaining(
717 &mut self,
718 cols: &[ArrayRef],
719 batch_hashes: &[u64],
720 groups: &mut [usize],
721 ) -> Result<()> {
722 if self
723 .vectorized_operation_buffers
724 .remaining_row_indices
725 .is_empty()
726 {
727 return Ok(());
728 }
729
730 let mut map = mem::take(&mut self.map);
731
732 for &row in &self.vectorized_operation_buffers.remaining_row_indices {
733 let target_hash = batch_hashes[row];
734 let entry = map.find_mut(target_hash, |(exist_hash, _)| {
735 // Somewhat surprisingly, this closure can be called even if the
736 // hash doesn't match, so check the hash first with an integer
737 // comparison first avoid the more expensive comparison with
738 // group value. https://github.com/apache/datafusion/pull/11718
739 target_hash == *exist_hash
740 });
741
742 // Only `rows` having the same hash value with `exist rows` but different value
743 // will be process in `scalarized_intern`.
744 // So related `buckets` in `map` is ensured to be `Some`.
745 let Some((_, group_index_view)) = entry else {
746 unreachable!()
747 };
748
749 // Perform scalarized equal to
750 if self.scalarized_equal_to_remaining(group_index_view, cols, row, groups) {
751 // Found the row actually exists in group values,
752 // don't need to create new group for it.
753 continue;
754 }
755
756 // Insert the `row` to `group_values` before checking `next row`
757 let group_idx = self.group_values[0].len();
758 let mut checklen = 0;
759 for (i, group_value) in self.group_values.iter_mut().enumerate() {
760 group_value.append_val(&cols[i], row)?;
761 let len = group_value.len();
762 if i == 0 {
763 checklen = len;
764 } else {
765 debug_assert_eq!(checklen, len);
766 }
767 }
768
769 // Check if the `view` is `inlined` or `non-inlined`
770 if group_index_view.is_non_inlined() {
771 // Non-inlined case, get `group_index_list` from `group_index_lists`,
772 // then add the new `group` with the same hash values into it.
773 let list_offset = group_index_view.value() as usize;
774 let group_index_list = &mut self.group_index_lists[list_offset];
775 group_index_list.push(group_idx);
776 } else {
777 // Inlined case
778 let list_offset = self.group_index_lists.len();
779
780 // Create new `group_index_list` including
781 // `exist group index` + `new group index`.
782 // Add new `group_index_list` into ``group_index_lists`.
783 let exist_group_index = group_index_view.value() as usize;
784 let new_group_index_list = vec![exist_group_index, group_idx];
785 self.group_index_lists.push(new_group_index_list);
786
787 // Update the `group_index_view` to non-inlined
788 let new_group_index_view =
789 GroupIndexView::new_non_inlined(list_offset as u64);
790 *group_index_view = new_group_index_view;
791 }
792
793 groups[row] = group_idx;
794 }
795
796 self.map = map;
797 Ok(())
798 }
799
800 fn scalarized_equal_to_remaining(
801 &self,
802 group_index_view: &GroupIndexView,
803 cols: &[ArrayRef],
804 row: usize,
805 groups: &mut [usize],
806 ) -> bool {
807 // Check if this row exists in `group_values`
808 fn check_row_equal(
809 array_row: &dyn GroupColumn,
810 lhs_row: usize,
811 array: &ArrayRef,
812 rhs_row: usize,
813 ) -> bool {
814 array_row.equal_to(lhs_row, array, rhs_row)
815 }
816
817 if group_index_view.is_non_inlined() {
818 let list_offset = group_index_view.value() as usize;
819 let group_index_list = &self.group_index_lists[list_offset];
820
821 for &group_idx in group_index_list {
822 let mut check_result = true;
823 for (i, group_val) in self.group_values.iter().enumerate() {
824 if !check_row_equal(group_val.as_ref(), group_idx, &cols[i], row) {
825 check_result = false;
826 break;
827 }
828 }
829
830 if check_result {
831 groups[row] = group_idx;
832 return true;
833 }
834 }
835
836 // All groups unmatched, return false result
837 false
838 } else {
839 let group_idx = group_index_view.value() as usize;
840 for (i, group_val) in self.group_values.iter().enumerate() {
841 if !check_row_equal(group_val.as_ref(), group_idx, &cols[i], row) {
842 return false;
843 }
844 }
845
846 groups[row] = group_idx;
847 true
848 }
849 }
850
851 /// Return group indices of the hash, also if its `group_index_view` is non-inlined
852 #[cfg(test)]
853 fn get_indices_by_hash(&self, hash: u64) -> Option<(Vec<usize>, GroupIndexView)> {
854 let entry = self.map.find(hash, |(exist_hash, _)| hash == *exist_hash);
855
856 match entry {
857 Some((_, group_index_view)) => {
858 if group_index_view.is_non_inlined() {
859 let list_offset = group_index_view.value() as usize;
860 Some((
861 self.group_index_lists[list_offset].clone(),
862 *group_index_view,
863 ))
864 } else {
865 let group_index = group_index_view.value() as usize;
866 Some((vec![group_index], *group_index_view))
867 }
868 }
869 None => None,
870 }
871 }
872}
873
874/// instantiates a [`PrimitiveGroupValueBuilder`] and pushes it into $v
875///
876/// Arguments:
877/// `$v`: the vector to push the new builder into
878/// `$nullable`: whether the input can contains nulls
879/// `$t`: the primitive type of the builder
880macro_rules! instantiate_primitive {
881 ($v:expr, $nullable:expr, $t:ty, $data_type:ident) => {
882 if $nullable {
883 let b = PrimitiveGroupValueBuilder::<$t, true>::new($data_type.to_owned());
884 $v.push(Box::new(b) as _)
885 } else {
886 let b = PrimitiveGroupValueBuilder::<$t, false>::new($data_type.to_owned());
887 $v.push(Box::new(b) as _)
888 }
889 };
890}
891
892impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING> {
893 fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
894 if self.group_values.is_empty() {
895 let mut v = Vec::with_capacity(cols.len());
896
897 for f in self.schema.fields().iter() {
898 let nullable = f.is_nullable();
899 let data_type = f.data_type();
900 match data_type {
901 &DataType::Int8 => {
902 instantiate_primitive!(v, nullable, Int8Type, data_type)
903 }
904 &DataType::Int16 => {
905 instantiate_primitive!(v, nullable, Int16Type, data_type)
906 }
907 &DataType::Int32 => {
908 instantiate_primitive!(v, nullable, Int32Type, data_type)
909 }
910 &DataType::Int64 => {
911 instantiate_primitive!(v, nullable, Int64Type, data_type)
912 }
913 &DataType::UInt8 => {
914 instantiate_primitive!(v, nullable, UInt8Type, data_type)
915 }
916 &DataType::UInt16 => {
917 instantiate_primitive!(v, nullable, UInt16Type, data_type)
918 }
919 &DataType::UInt32 => {
920 instantiate_primitive!(v, nullable, UInt32Type, data_type)
921 }
922 &DataType::UInt64 => {
923 instantiate_primitive!(v, nullable, UInt64Type, data_type)
924 }
925 &DataType::Float32 => {
926 instantiate_primitive!(v, nullable, Float32Type, data_type)
927 }
928 &DataType::Float64 => {
929 instantiate_primitive!(v, nullable, Float64Type, data_type)
930 }
931 &DataType::Date32 => {
932 instantiate_primitive!(v, nullable, Date32Type, data_type)
933 }
934 &DataType::Date64 => {
935 instantiate_primitive!(v, nullable, Date64Type, data_type)
936 }
937 &DataType::Time32(t) => match t {
938 TimeUnit::Second => {
939 instantiate_primitive!(
940 v,
941 nullable,
942 Time32SecondType,
943 data_type
944 )
945 }
946 TimeUnit::Millisecond => {
947 instantiate_primitive!(
948 v,
949 nullable,
950 Time32MillisecondType,
951 data_type
952 )
953 }
954 _ => {}
955 },
956 &DataType::Time64(t) => match t {
957 TimeUnit::Microsecond => {
958 instantiate_primitive!(
959 v,
960 nullable,
961 Time64MicrosecondType,
962 data_type
963 )
964 }
965 TimeUnit::Nanosecond => {
966 instantiate_primitive!(
967 v,
968 nullable,
969 Time64NanosecondType,
970 data_type
971 )
972 }
973 _ => {}
974 },
975 &DataType::Timestamp(t, _) => match t {
976 TimeUnit::Second => {
977 instantiate_primitive!(
978 v,
979 nullable,
980 TimestampSecondType,
981 data_type
982 )
983 }
984 TimeUnit::Millisecond => {
985 instantiate_primitive!(
986 v,
987 nullable,
988 TimestampMillisecondType,
989 data_type
990 )
991 }
992 TimeUnit::Microsecond => {
993 instantiate_primitive!(
994 v,
995 nullable,
996 TimestampMicrosecondType,
997 data_type
998 )
999 }
1000 TimeUnit::Nanosecond => {
1001 instantiate_primitive!(
1002 v,
1003 nullable,
1004 TimestampNanosecondType,
1005 data_type
1006 )
1007 }
1008 },
1009 &DataType::Decimal128(_, _) => {
1010 instantiate_primitive! {
1011 v,
1012 nullable,
1013 Decimal128Type,
1014 data_type
1015 }
1016 }
1017 &DataType::Utf8 => {
1018 let b = ByteGroupValueBuilder::<i32>::new(OutputType::Utf8);
1019 v.push(Box::new(b) as _)
1020 }
1021 &DataType::LargeUtf8 => {
1022 let b = ByteGroupValueBuilder::<i64>::new(OutputType::Utf8);
1023 v.push(Box::new(b) as _)
1024 }
1025 &DataType::Binary => {
1026 let b = ByteGroupValueBuilder::<i32>::new(OutputType::Binary);
1027 v.push(Box::new(b) as _)
1028 }
1029 &DataType::LargeBinary => {
1030 let b = ByteGroupValueBuilder::<i64>::new(OutputType::Binary);
1031 v.push(Box::new(b) as _)
1032 }
1033 &DataType::Utf8View => {
1034 let b = ByteViewGroupValueBuilder::<StringViewType>::new();
1035 v.push(Box::new(b) as _)
1036 }
1037 &DataType::BinaryView => {
1038 let b = ByteViewGroupValueBuilder::<BinaryViewType>::new();
1039 v.push(Box::new(b) as _)
1040 }
1041 &DataType::Boolean => {
1042 if nullable {
1043 let b = BooleanGroupValueBuilder::<true>::new();
1044 v.push(Box::new(b) as _)
1045 } else {
1046 let b = BooleanGroupValueBuilder::<false>::new();
1047 v.push(Box::new(b) as _)
1048 }
1049 }
1050 dt => {
1051 return not_impl_err!("{dt} not supported in GroupValuesColumn")
1052 }
1053 }
1054 }
1055 self.group_values = v;
1056 }
1057
1058 if !STREAMING {
1059 self.vectorized_intern(cols, groups)
1060 } else {
1061 self.scalarized_intern(cols, groups)
1062 }
1063 }
1064
1065 fn size(&self) -> usize {
1066 let group_values_size: usize = self.group_values.iter().map(|v| v.size()).sum();
1067 group_values_size + self.map_size + self.hashes_buffer.allocated_size()
1068 }
1069
1070 fn is_empty(&self) -> bool {
1071 self.len() == 0
1072 }
1073
1074 fn len(&self) -> usize {
1075 if self.group_values.is_empty() {
1076 return 0;
1077 }
1078
1079 self.group_values[0].len()
1080 }
1081
1082 fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
1083 let mut output = match emit_to {
1084 EmitTo::All => {
1085 let group_values = mem::take(&mut self.group_values);
1086 debug_assert!(self.group_values.is_empty());
1087
1088 group_values
1089 .into_iter()
1090 .map(|v| v.build())
1091 .collect::<Vec<_>>()
1092 }
1093 EmitTo::First(n) => {
1094 let output = self
1095 .group_values
1096 .iter_mut()
1097 .map(|v| v.take_n(n))
1098 .collect::<Vec<_>>();
1099 let mut next_new_list_offset = 0;
1100
1101 self.map.retain(|(_exist_hash, group_idx_view)| {
1102 // In non-streaming case, we need to check if the `group index view`
1103 // is `inlined` or `non-inlined`
1104 if !STREAMING && group_idx_view.is_non_inlined() {
1105 // Non-inlined case
1106 // We take `group_index_list` from `old_group_index_lists`
1107
1108 // list_offset is incrementally
1109 self.emit_group_index_list_buffer.clear();
1110 let list_offset = group_idx_view.value() as usize;
1111 for group_index in self.group_index_lists[list_offset].iter() {
1112 if let Some(remaining) = group_index.checked_sub(n) {
1113 self.emit_group_index_list_buffer.push(remaining);
1114 }
1115 }
1116
1117 // The possible results:
1118 // - `new_group_index_list` is empty, we should erase this bucket
1119 // - only one value in `new_group_index_list`, switch the `view` to `inlined`
1120 // - still multiple values in `new_group_index_list`, build and set the new `unlined view`
1121 if self.emit_group_index_list_buffer.is_empty() {
1122 false
1123 } else if self.emit_group_index_list_buffer.len() == 1 {
1124 let group_index =
1125 self.emit_group_index_list_buffer.first().unwrap();
1126 *group_idx_view =
1127 GroupIndexView::new_inlined(*group_index as u64);
1128 true
1129 } else {
1130 let group_index_list =
1131 &mut self.group_index_lists[next_new_list_offset];
1132 group_index_list.clear();
1133 group_index_list
1134 .extend(self.emit_group_index_list_buffer.iter());
1135 *group_idx_view = GroupIndexView::new_non_inlined(
1136 next_new_list_offset as u64,
1137 );
1138 next_new_list_offset += 1;
1139 true
1140 }
1141 } else {
1142 // In `streaming case`, the `group index view` is ensured to be `inlined`
1143 debug_assert!(!group_idx_view.is_non_inlined());
1144
1145 // Inlined case, we just decrement group index by n)
1146 let group_index = group_idx_view.value() as usize;
1147 match group_index.checked_sub(n) {
1148 // Group index was >= n, shift value down
1149 Some(sub) => {
1150 *group_idx_view = GroupIndexView::new_inlined(sub as u64);
1151 true
1152 }
1153 // Group index was < n, so remove from table
1154 None => false,
1155 }
1156 }
1157 });
1158
1159 if !STREAMING {
1160 self.group_index_lists.truncate(next_new_list_offset);
1161 }
1162
1163 output
1164 }
1165 };
1166
1167 // TODO: Materialize dictionaries in group keys (#7647)
1168 for (field, array) in self.schema.fields.iter().zip(&mut output) {
1169 let expected = field.data_type();
1170 if let DataType::Dictionary(_, v) = expected {
1171 let actual = array.data_type();
1172 if v.as_ref() != actual {
1173 return Err(internal_datafusion_err!(
1174 "Converted group rows expected dictionary of {v} got {actual}"
1175 ));
1176 }
1177 *array = cast(array.as_ref(), expected)?;
1178 }
1179 }
1180
1181 Ok(output)
1182 }
1183
1184 fn clear_shrink(&mut self, batch: &RecordBatch) {
1185 let count = batch.num_rows();
1186 self.group_values.clear();
1187 self.map.clear();
1188 self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared
1189 self.map_size = self.map.capacity() * size_of::<(u64, usize)>();
1190 self.hashes_buffer.clear();
1191 self.hashes_buffer.shrink_to(count);
1192
1193 // Such structures are only used in `non-streaming` case
1194 if !STREAMING {
1195 self.group_index_lists.clear();
1196 self.emit_group_index_list_buffer.clear();
1197 self.vectorized_operation_buffers.clear();
1198 }
1199 }
1200}
1201
1202/// Returns true if [`GroupValuesColumn`] supported for the specified schema
1203pub fn supported_schema(schema: &Schema) -> bool {
1204 schema
1205 .fields()
1206 .iter()
1207 .map(|f| f.data_type())
1208 .all(supported_type)
1209}
1210
1211/// Returns true if the specified data type is supported by [`GroupValuesColumn`]
1212///
1213/// In order to be supported, there must be a specialized implementation of
1214/// [`GroupColumn`] for the data type, instantiated in [`GroupValuesColumn::intern`]
1215fn supported_type(data_type: &DataType) -> bool {
1216 matches!(
1217 *data_type,
1218 DataType::Int8
1219 | DataType::Int16
1220 | DataType::Int32
1221 | DataType::Int64
1222 | DataType::UInt8
1223 | DataType::UInt16
1224 | DataType::UInt32
1225 | DataType::UInt64
1226 | DataType::Float32
1227 | DataType::Float64
1228 | DataType::Decimal128(_, _)
1229 | DataType::Utf8
1230 | DataType::LargeUtf8
1231 | DataType::Binary
1232 | DataType::LargeBinary
1233 | DataType::Date32
1234 | DataType::Date64
1235 | DataType::Time32(_)
1236 | DataType::Timestamp(_, _)
1237 | DataType::Utf8View
1238 | DataType::BinaryView
1239 | DataType::Boolean
1240 )
1241}
1242
1243///Shows how many `null`s there are in an array
1244enum Nulls {
1245 /// All array items are `null`s
1246 All,
1247 /// There are both `null`s and non-`null`s in the array items
1248 Some,
1249 /// There are no `null`s in the array items
1250 None,
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255 use std::{collections::HashMap, sync::Arc};
1256
1257 use arrow::array::{ArrayRef, Int64Array, RecordBatch, StringArray, StringViewArray};
1258 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1259 use arrow::{compute::concat_batches, util::pretty::pretty_format_batches};
1260 use datafusion_common::utils::proxy::HashTableAllocExt;
1261 use datafusion_expr::EmitTo;
1262
1263 use crate::aggregates::group_values::{
1264 multi_group_by::GroupValuesColumn, GroupValues,
1265 };
1266
1267 use super::GroupIndexView;
1268
1269 #[test]
1270 fn test_intern_for_vectorized_group_values() {
1271 let data_set = VectorizedTestDataSet::new();
1272 let mut group_values =
1273 GroupValuesColumn::<false>::try_new(data_set.schema()).unwrap();
1274
1275 data_set.load_to_group_values(&mut group_values);
1276 let actual_batch = group_values.emit(EmitTo::All).unwrap();
1277 let actual_batch = RecordBatch::try_new(data_set.schema(), actual_batch).unwrap();
1278
1279 check_result(&actual_batch, &data_set.expected_batch);
1280 }
1281
1282 #[test]
1283 fn test_emit_first_n_for_vectorized_group_values() {
1284 let data_set = VectorizedTestDataSet::new();
1285 let mut group_values =
1286 GroupValuesColumn::<false>::try_new(data_set.schema()).unwrap();
1287
1288 // 1~num_rows times to emit the groups
1289 let num_rows = data_set.expected_batch.num_rows();
1290 let schema = data_set.schema();
1291 for times_to_take in 1..=num_rows {
1292 // Write data after emitting
1293 data_set.load_to_group_values(&mut group_values);
1294
1295 // Emit `times_to_take` times, collect and concat the sub-results to total result,
1296 // then check it
1297 let suggest_num_emit = data_set.expected_batch.num_rows() / times_to_take;
1298 let mut num_remaining_rows = num_rows;
1299 let mut actual_sub_batches = Vec::new();
1300
1301 for nth_time in 0..times_to_take {
1302 let num_emit = if nth_time == times_to_take - 1 {
1303 num_remaining_rows
1304 } else {
1305 suggest_num_emit
1306 };
1307
1308 let sub_batch = group_values.emit(EmitTo::First(num_emit)).unwrap();
1309 let sub_batch =
1310 RecordBatch::try_new(Arc::clone(&schema), sub_batch).unwrap();
1311 actual_sub_batches.push(sub_batch);
1312
1313 num_remaining_rows -= num_emit;
1314 }
1315 assert!(num_remaining_rows == 0);
1316
1317 let actual_batch = concat_batches(&schema, &actual_sub_batches).unwrap();
1318 check_result(&actual_batch, &data_set.expected_batch);
1319 }
1320 }
1321
1322 #[test]
1323 fn test_hashtable_modifying_in_emit_first_n() {
1324 // Situations should be covered:
1325 // 1. Erase inlined group index view
1326 // 2. Erase whole non-inlined group index view
1327 // 3. Erase + decrease group indices in non-inlined group index view
1328 // + view still non-inlined after decreasing
1329 // 4. Erase + decrease group indices in non-inlined group index view
1330 // + view switch to inlined after decreasing
1331 // 5. Only decrease group index in inlined group index view
1332 // 6. Only decrease group indices in non-inlined group index view
1333 // 7. Erase all things
1334
1335 let field = Field::new_list_field(DataType::Int32, true);
1336 let schema = Arc::new(Schema::new_with_metadata(vec![field], HashMap::new()));
1337 let mut group_values = GroupValuesColumn::<false>::try_new(schema).unwrap();
1338
1339 // Insert group index views and check if success to insert
1340 insert_inline_group_index_view(&mut group_values, 0, 0);
1341 insert_non_inline_group_index_view(&mut group_values, 1, vec![1, 2]);
1342 insert_non_inline_group_index_view(&mut group_values, 2, vec![3, 4, 5]);
1343 insert_inline_group_index_view(&mut group_values, 3, 6);
1344 insert_non_inline_group_index_view(&mut group_values, 4, vec![7, 8]);
1345 insert_non_inline_group_index_view(&mut group_values, 5, vec![9, 10, 11]);
1346
1347 assert_eq!(
1348 group_values.get_indices_by_hash(0).unwrap(),
1349 (vec![0], GroupIndexView::new_inlined(0))
1350 );
1351 assert_eq!(
1352 group_values.get_indices_by_hash(1).unwrap(),
1353 (vec![1, 2], GroupIndexView::new_non_inlined(0))
1354 );
1355 assert_eq!(
1356 group_values.get_indices_by_hash(2).unwrap(),
1357 (vec![3, 4, 5], GroupIndexView::new_non_inlined(1))
1358 );
1359 assert_eq!(
1360 group_values.get_indices_by_hash(3).unwrap(),
1361 (vec![6], GroupIndexView::new_inlined(6))
1362 );
1363 assert_eq!(
1364 group_values.get_indices_by_hash(4).unwrap(),
1365 (vec![7, 8], GroupIndexView::new_non_inlined(2))
1366 );
1367 assert_eq!(
1368 group_values.get_indices_by_hash(5).unwrap(),
1369 (vec![9, 10, 11], GroupIndexView::new_non_inlined(3))
1370 );
1371 assert_eq!(group_values.map.len(), 6);
1372
1373 // Emit first 4 to test cases 1~3, 5~6
1374 let _ = group_values.emit(EmitTo::First(4)).unwrap();
1375 assert!(group_values.get_indices_by_hash(0).is_none());
1376 assert!(group_values.get_indices_by_hash(1).is_none());
1377 assert_eq!(
1378 group_values.get_indices_by_hash(2).unwrap(),
1379 (vec![0, 1], GroupIndexView::new_non_inlined(0))
1380 );
1381 assert_eq!(
1382 group_values.get_indices_by_hash(3).unwrap(),
1383 (vec![2], GroupIndexView::new_inlined(2))
1384 );
1385 assert_eq!(
1386 group_values.get_indices_by_hash(4).unwrap(),
1387 (vec![3, 4], GroupIndexView::new_non_inlined(1))
1388 );
1389 assert_eq!(
1390 group_values.get_indices_by_hash(5).unwrap(),
1391 (vec![5, 6, 7], GroupIndexView::new_non_inlined(2))
1392 );
1393 assert_eq!(group_values.map.len(), 4);
1394
1395 // Emit first 1 to test case 4, and cases 5~6 again
1396 let _ = group_values.emit(EmitTo::First(1)).unwrap();
1397 assert_eq!(
1398 group_values.get_indices_by_hash(2).unwrap(),
1399 (vec![0], GroupIndexView::new_inlined(0))
1400 );
1401 assert_eq!(
1402 group_values.get_indices_by_hash(3).unwrap(),
1403 (vec![1], GroupIndexView::new_inlined(1))
1404 );
1405 assert_eq!(
1406 group_values.get_indices_by_hash(4).unwrap(),
1407 (vec![2, 3], GroupIndexView::new_non_inlined(0))
1408 );
1409 assert_eq!(
1410 group_values.get_indices_by_hash(5).unwrap(),
1411 (vec![4, 5, 6], GroupIndexView::new_non_inlined(1))
1412 );
1413 assert_eq!(group_values.map.len(), 4);
1414
1415 // Emit first 5 to test cases 1~3 again
1416 let _ = group_values.emit(EmitTo::First(5)).unwrap();
1417 assert_eq!(
1418 group_values.get_indices_by_hash(5).unwrap(),
1419 (vec![0, 1], GroupIndexView::new_non_inlined(0))
1420 );
1421 assert_eq!(group_values.map.len(), 1);
1422
1423 // Emit first 1 to test cases 4 again
1424 let _ = group_values.emit(EmitTo::First(1)).unwrap();
1425 assert_eq!(
1426 group_values.get_indices_by_hash(5).unwrap(),
1427 (vec![0], GroupIndexView::new_inlined(0))
1428 );
1429 assert_eq!(group_values.map.len(), 1);
1430
1431 // Emit first 1 to test cases 7
1432 let _ = group_values.emit(EmitTo::First(1)).unwrap();
1433 assert!(group_values.map.is_empty());
1434 }
1435
1436 /// Test data set for [`GroupValuesColumn::vectorized_intern`]
1437 ///
1438 /// Define the test data and support loading them into test [`GroupValuesColumn::vectorized_intern`]
1439 ///
1440 /// The covering situations:
1441 ///
1442 /// Array type:
1443 /// - Primitive array
1444 /// - String(byte) array
1445 /// - String view(byte view) array
1446 ///
1447 /// Repeation and nullability in single batch:
1448 /// - All not null rows
1449 /// - Mixed null + not null rows
1450 /// - All null rows
1451 /// - All not null rows(repeated)
1452 /// - Null + not null rows(repeated)
1453 /// - All not null rows(repeated)
1454 ///
1455 /// If group exists in `map`:
1456 /// - Group exists in inlined group view
1457 /// - Group exists in non-inlined group view
1458 /// - Group not exist + bucket not found in `map`
1459 /// - Group not exist + not equal to inlined group view(tested in hash collision)
1460 /// - Group not exist + not equal to non-inlined group view(tested in hash collision)
1461 struct VectorizedTestDataSet {
1462 test_batches: Vec<Vec<ArrayRef>>,
1463 expected_batch: RecordBatch,
1464 }
1465
1466 impl VectorizedTestDataSet {
1467 fn new() -> Self {
1468 // Intern batch 1
1469 let col1 = Int64Array::from(vec![
1470 // Repeated rows in batch
1471 Some(42), // all not nulls + repeated rows + exist in map case
1472 None, // mixed + repeated rows + exist in map case
1473 None, // mixed + repeated rows + not exist in map case
1474 Some(1142), // mixed + repeated rows + not exist in map case
1475 None, // all nulls + repeated rows + exist in map case
1476 Some(42),
1477 None,
1478 None,
1479 Some(1142),
1480 None,
1481 // Unique rows in batch
1482 Some(4211), // all not nulls + unique rows + exist in map case
1483 None, // mixed + unique rows + exist in map case
1484 None, // mixed + unique rows + not exist in map case
1485 Some(4212), // mixed + unique rows + not exist in map case
1486 ]);
1487
1488 let col2 = StringArray::from(vec![
1489 // Repeated rows in batch
1490 Some("string1"), // all not nulls + repeated rows + exist in map case
1491 None, // mixed + repeated rows + exist in map case
1492 Some("string2"), // mixed + repeated rows + not exist in map case
1493 None, // mixed + repeated rows + not exist in map case
1494 None, // all nulls + repeated rows + exist in map case
1495 Some("string1"),
1496 None,
1497 Some("string2"),
1498 None,
1499 None,
1500 // Unique rows in batch
1501 Some("string3"), // all not nulls + unique rows + exist in map case
1502 None, // mixed + unique rows + exist in map case
1503 Some("string4"), // mixed + unique rows + not exist in map case
1504 None, // mixed + unique rows + not exist in map case
1505 ]);
1506
1507 let col3 = StringViewArray::from(vec![
1508 // Repeated rows in batch
1509 Some("stringview1"), // all not nulls + repeated rows + exist in map case
1510 Some("stringview2"), // mixed + repeated rows + exist in map case
1511 None, // mixed + repeated rows + not exist in map case
1512 None, // mixed + repeated rows + not exist in map case
1513 None, // all nulls + repeated rows + exist in map case
1514 Some("stringview1"),
1515 Some("stringview2"),
1516 None,
1517 None,
1518 None,
1519 // Unique rows in batch
1520 Some("stringview3"), // all not nulls + unique rows + exist in map case
1521 Some("stringview4"), // mixed + unique rows + exist in map case
1522 None, // mixed + unique rows + not exist in map case
1523 None, // mixed + unique rows + not exist in map case
1524 ]);
1525 let batch1 = vec![
1526 Arc::new(col1) as _,
1527 Arc::new(col2) as _,
1528 Arc::new(col3) as _,
1529 ];
1530
1531 // Intern batch 2
1532 let col1 = Int64Array::from(vec![
1533 // Repeated rows in batch
1534 Some(42), // all not nulls + repeated rows + exist in map case
1535 None, // mixed + repeated rows + exist in map case
1536 None, // mixed + repeated rows + not exist in map case
1537 Some(21142), // mixed + repeated rows + not exist in map case
1538 None, // all nulls + repeated rows + exist in map case
1539 Some(42),
1540 None,
1541 None,
1542 Some(21142),
1543 None,
1544 // Unique rows in batch
1545 Some(4211), // all not nulls + unique rows + exist in map case
1546 None, // mixed + unique rows + exist in map case
1547 None, // mixed + unique rows + not exist in map case
1548 Some(24212), // mixed + unique rows + not exist in map case
1549 ]);
1550
1551 let col2 = StringArray::from(vec![
1552 // Repeated rows in batch
1553 Some("string1"), // all not nulls + repeated rows + exist in map case
1554 None, // mixed + repeated rows + exist in map case
1555 Some("2string2"), // mixed + repeated rows + not exist in map case
1556 None, // mixed + repeated rows + not exist in map case
1557 None, // all nulls + repeated rows + exist in map case
1558 Some("string1"),
1559 None,
1560 Some("2string2"),
1561 None,
1562 None,
1563 // Unique rows in batch
1564 Some("string3"), // all not nulls + unique rows + exist in map case
1565 None, // mixed + unique rows + exist in map case
1566 Some("2string4"), // mixed + unique rows + not exist in map case
1567 None, // mixed + unique rows + not exist in map case
1568 ]);
1569
1570 let col3 = StringViewArray::from(vec![
1571 // Repeated rows in batch
1572 Some("stringview1"), // all not nulls + repeated rows + exist in map case
1573 Some("stringview2"), // mixed + repeated rows + exist in map case
1574 None, // mixed + repeated rows + not exist in map case
1575 None, // mixed + repeated rows + not exist in map case
1576 None, // all nulls + repeated rows + exist in map case
1577 Some("stringview1"),
1578 Some("stringview2"),
1579 None,
1580 None,
1581 None,
1582 // Unique rows in batch
1583 Some("stringview3"), // all not nulls + unique rows + exist in map case
1584 Some("stringview4"), // mixed + unique rows + exist in map case
1585 None, // mixed + unique rows + not exist in map case
1586 None, // mixed + unique rows + not exist in map case
1587 ]);
1588 let batch2 = vec![
1589 Arc::new(col1) as _,
1590 Arc::new(col2) as _,
1591 Arc::new(col3) as _,
1592 ];
1593
1594 // Intern batch 3
1595 let col1 = Int64Array::from(vec![
1596 // Repeated rows in batch
1597 Some(42), // all not nulls + repeated rows + exist in map case
1598 None, // mixed + repeated rows + exist in map case
1599 None, // mixed + repeated rows + not exist in map case
1600 Some(31142), // mixed + repeated rows + not exist in map case
1601 None, // all nulls + repeated rows + exist in map case
1602 Some(42),
1603 None,
1604 None,
1605 Some(31142),
1606 None,
1607 // Unique rows in batch
1608 Some(4211), // all not nulls + unique rows + exist in map case
1609 None, // mixed + unique rows + exist in map case
1610 None, // mixed + unique rows + not exist in map case
1611 Some(34212), // mixed + unique rows + not exist in map case
1612 ]);
1613
1614 let col2 = StringArray::from(vec![
1615 // Repeated rows in batch
1616 Some("string1"), // all not nulls + repeated rows + exist in map case
1617 None, // mixed + repeated rows + exist in map case
1618 Some("3string2"), // mixed + repeated rows + not exist in map case
1619 None, // mixed + repeated rows + not exist in map case
1620 None, // all nulls + repeated rows + exist in map case
1621 Some("string1"),
1622 None,
1623 Some("3string2"),
1624 None,
1625 None,
1626 // Unique rows in batch
1627 Some("string3"), // all not nulls + unique rows + exist in map case
1628 None, // mixed + unique rows + exist in map case
1629 Some("3string4"), // mixed + unique rows + not exist in map case
1630 None, // mixed + unique rows + not exist in map case
1631 ]);
1632
1633 let col3 = StringViewArray::from(vec![
1634 // Repeated rows in batch
1635 Some("stringview1"), // all not nulls + repeated rows + exist in map case
1636 Some("stringview2"), // mixed + repeated rows + exist in map case
1637 None, // mixed + repeated rows + not exist in map case
1638 None, // mixed + repeated rows + not exist in map case
1639 None, // all nulls + repeated rows + exist in map case
1640 Some("stringview1"),
1641 Some("stringview2"),
1642 None,
1643 None,
1644 None,
1645 // Unique rows in batch
1646 Some("stringview3"), // all not nulls + unique rows + exist in map case
1647 Some("stringview4"), // mixed + unique rows + exist in map case
1648 None, // mixed + unique rows + not exist in map case
1649 None, // mixed + unique rows + not exist in map case
1650 ]);
1651 let batch3 = vec![
1652 Arc::new(col1) as _,
1653 Arc::new(col2) as _,
1654 Arc::new(col3) as _,
1655 ];
1656
1657 // Expected batch
1658 let schema = Arc::new(Schema::new(vec![
1659 Field::new("a", DataType::Int64, true),
1660 Field::new("b", DataType::Utf8, true),
1661 Field::new("c", DataType::Utf8View, true),
1662 ]));
1663
1664 let col1 = Int64Array::from(vec![
1665 // Repeated rows in batch
1666 Some(42),
1667 None,
1668 None,
1669 Some(1142),
1670 None,
1671 Some(21142),
1672 None,
1673 Some(31142),
1674 None,
1675 // Unique rows in batch
1676 Some(4211),
1677 None,
1678 None,
1679 Some(4212),
1680 None,
1681 Some(24212),
1682 None,
1683 Some(34212),
1684 ]);
1685
1686 let col2 = StringArray::from(vec![
1687 // Repeated rows in batch
1688 Some("string1"),
1689 None,
1690 Some("string2"),
1691 None,
1692 Some("2string2"),
1693 None,
1694 Some("3string2"),
1695 None,
1696 None,
1697 // Unique rows in batch
1698 Some("string3"),
1699 None,
1700 Some("string4"),
1701 None,
1702 Some("2string4"),
1703 None,
1704 Some("3string4"),
1705 None,
1706 ]);
1707
1708 let col3 = StringViewArray::from(vec![
1709 // Repeated rows in batch
1710 Some("stringview1"),
1711 Some("stringview2"),
1712 None,
1713 None,
1714 None,
1715 None,
1716 None,
1717 None,
1718 None,
1719 // Unique rows in batch
1720 Some("stringview3"),
1721 Some("stringview4"),
1722 None,
1723 None,
1724 None,
1725 None,
1726 None,
1727 None,
1728 ]);
1729 let expected_batch = vec![
1730 Arc::new(col1) as _,
1731 Arc::new(col2) as _,
1732 Arc::new(col3) as _,
1733 ];
1734 let expected_batch = RecordBatch::try_new(schema, expected_batch).unwrap();
1735
1736 Self {
1737 test_batches: vec![batch1, batch2, batch3],
1738 expected_batch,
1739 }
1740 }
1741
1742 fn load_to_group_values(&self, group_values: &mut impl GroupValues) {
1743 for batch in self.test_batches.iter() {
1744 group_values.intern(batch, &mut vec![]).unwrap();
1745 }
1746 }
1747
1748 fn schema(&self) -> SchemaRef {
1749 self.expected_batch.schema()
1750 }
1751 }
1752
1753 fn check_result(actual_batch: &RecordBatch, expected_batch: &RecordBatch) {
1754 let formatted_actual_batch =
1755 pretty_format_batches(std::slice::from_ref(actual_batch))
1756 .unwrap()
1757 .to_string();
1758 let mut formatted_actual_batch_sorted: Vec<&str> =
1759 formatted_actual_batch.trim().lines().collect();
1760 formatted_actual_batch_sorted.sort_unstable();
1761
1762 let formatted_expected_batch =
1763 pretty_format_batches(std::slice::from_ref(expected_batch))
1764 .unwrap()
1765 .to_string();
1766
1767 let mut formatted_expected_batch_sorted: Vec<&str> =
1768 formatted_expected_batch.trim().lines().collect();
1769 formatted_expected_batch_sorted.sort_unstable();
1770
1771 for (i, (actual_line, expected_line)) in formatted_actual_batch_sorted
1772 .iter()
1773 .zip(&formatted_expected_batch_sorted)
1774 .enumerate()
1775 {
1776 assert_eq!(
1777 (i, actual_line),
1778 (i, expected_line),
1779 "Inconsistent result\n\n\
1780 Actual batch:\n{formatted_actual_batch}\n\
1781 Expected batch:\n{formatted_expected_batch}\n\
1782 ",
1783 );
1784 }
1785 }
1786
1787 fn insert_inline_group_index_view(
1788 group_values: &mut GroupValuesColumn<false>,
1789 hash_key: u64,
1790 group_index: u64,
1791 ) {
1792 let group_index_view = GroupIndexView::new_inlined(group_index);
1793 group_values.map.insert_accounted(
1794 (hash_key, group_index_view),
1795 |(hash, _)| *hash,
1796 &mut group_values.map_size,
1797 );
1798 }
1799
1800 fn insert_non_inline_group_index_view(
1801 group_values: &mut GroupValuesColumn<false>,
1802 hash_key: u64,
1803 group_indices: Vec<usize>,
1804 ) {
1805 let list_offset = group_values.group_index_lists.len();
1806 let group_index_view = GroupIndexView::new_non_inlined(list_offset as u64);
1807 group_values.group_index_lists.push(group_indices);
1808 group_values.map.insert_accounted(
1809 (hash_key, group_index_view),
1810 |(hash, _)| *hash,
1811 &mut group_values.map_size,
1812 );
1813 }
1814}