datafusion_physical_plan/aggregates/group_values/multi_group_by/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `GroupValues` implementations for multi group by cases
19
20mod boolean;
21mod bytes;
22pub mod bytes_view;
23pub mod primitive;
24
25use std::mem::{self, size_of};
26
27use crate::aggregates::group_values::multi_group_by::{
28    boolean::BooleanGroupValueBuilder, bytes::ByteGroupValueBuilder,
29    bytes_view::ByteViewGroupValueBuilder, primitive::PrimitiveGroupValueBuilder,
30};
31use crate::aggregates::group_values::GroupValues;
32use ahash::RandomState;
33use arrow::array::{Array, ArrayRef, RecordBatch};
34use arrow::compute::cast;
35use arrow::datatypes::{
36    BinaryViewType, DataType, Date32Type, Date64Type, Decimal128Type, Float32Type,
37    Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, Schema, SchemaRef,
38    StringViewType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
39    Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
40    TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type,
41    UInt8Type,
42};
43use datafusion_common::hash_utils::create_hashes;
44use datafusion_common::{internal_datafusion_err, not_impl_err, Result};
45use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt};
46use datafusion_expr::EmitTo;
47use datafusion_physical_expr::binary_map::OutputType;
48
49use hashbrown::hash_table::HashTable;
50
51const NON_INLINED_FLAG: u64 = 0x8000000000000000;
52const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF;
53
54/// Trait for storing a single column of group values in [`GroupValuesColumn`]
55///
56/// Implementations of this trait store an in-progress collection of group values
57/// (similar to various builders in Arrow-rs) that allow for quick comparison to
58/// incoming rows.
59///
60/// [`GroupValuesColumn`]: crate::aggregates::group_values::GroupValuesColumn
61pub trait GroupColumn: Send + Sync {
62    /// Returns equal if the row stored in this builder at `lhs_row` is equal to
63    /// the row in `array` at `rhs_row`
64    ///
65    /// Note that this comparison returns true if both elements are NULL
66    fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool;
67
68    /// Appends the row at `row` in `array` to this builder
69    fn append_val(&mut self, array: &ArrayRef, row: usize) -> Result<()>;
70
71    /// The vectorized version equal to
72    ///
73    /// When found nth row stored in this builder at `lhs_row`
74    /// is equal to the row in `array` at `rhs_row`,
75    /// it will record the `true` result at the corresponding
76    /// position in `equal_to_results`.
77    ///
78    /// And if found nth result in `equal_to_results` is already
79    /// `false`, the check for nth row will be skipped.
80    fn vectorized_equal_to(
81        &self,
82        lhs_rows: &[usize],
83        array: &ArrayRef,
84        rhs_rows: &[usize],
85        equal_to_results: &mut [bool],
86    );
87
88    /// The vectorized version `append_val`
89    fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()>;
90
91    /// Returns the number of rows stored in this builder
92    fn len(&self) -> usize;
93
94    /// true if len == 0
95    fn is_empty(&self) -> bool {
96        self.len() == 0
97    }
98
99    /// Returns the number of bytes used by this [`GroupColumn`]
100    fn size(&self) -> usize;
101
102    /// Builds a new array from all of the stored rows
103    fn build(self: Box<Self>) -> ArrayRef;
104
105    /// Builds a new array from the first `n` stored rows, shifting the
106    /// remaining rows to the start of the builder
107    fn take_n(&mut self, n: usize) -> ArrayRef;
108}
109
110/// Determines if the nullability of the existing and new input array can be used
111/// to short-circuit the comparison of the two values.
112///
113/// Returns `Some(result)` if the result of the comparison can be determined
114/// from the nullness of the two values, and `None` if the comparison must be
115/// done on the values themselves.
116pub fn nulls_equal_to(lhs_null: bool, rhs_null: bool) -> Option<bool> {
117    match (lhs_null, rhs_null) {
118        (true, true) => Some(true),
119        (false, true) | (true, false) => Some(false),
120        _ => None,
121    }
122}
123
124/// The view of indices pointing to the actual values in `GroupValues`
125///
126/// If only single `group index` represented by view,
127/// value of view is just the `group index`, and we call it a `inlined view`.
128///
129/// If multiple `group indices` represented by view,
130/// value of view is the actually the index pointing to `group indices`,
131/// and we call it `non-inlined view`.
132///
133/// The view(a u64) format is like:
134///   +---------------------+---------------------------------------------+
135///   | inlined flag(1bit)  | group index / index to group indices(63bit) |
136///   +---------------------+---------------------------------------------+
137///
138/// `inlined flag`: 1 represents `non-inlined`, and 0 represents `inlined`
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
140struct GroupIndexView(u64);
141
142impl GroupIndexView {
143    #[inline]
144    pub fn is_non_inlined(&self) -> bool {
145        (self.0 & NON_INLINED_FLAG) > 0
146    }
147
148    #[inline]
149    pub fn new_inlined(group_index: u64) -> Self {
150        Self(group_index)
151    }
152
153    #[inline]
154    pub fn new_non_inlined(list_offset: u64) -> Self {
155        let non_inlined_value = list_offset | NON_INLINED_FLAG;
156        Self(non_inlined_value)
157    }
158
159    #[inline]
160    pub fn value(&self) -> u64 {
161        self.0 & VALUE_MASK
162    }
163}
164
165/// A [`GroupValues`] that stores multiple columns of group values,
166/// and supports vectorized operators for them
167pub struct GroupValuesColumn<const STREAMING: bool> {
168    /// The output schema
169    schema: SchemaRef,
170
171    /// Logically maps group values to a group_index in
172    /// [`Self::group_values`] and in each accumulator
173    ///
174    /// It is a `hashtable` based on `hashbrown`.
175    ///
176    /// Key and value in the `hashtable`:
177    ///   - The `key` is `hash value(u64)` of the `group value`
178    ///   - The `value` is the `group values` with the same `hash value`
179    ///
180    /// We don't really store the actual `group values` in `hashtable`,
181    /// instead we store the `group indices` pointing to values in `GroupValues`.
182    /// And we use [`GroupIndexView`] to represent such `group indices` in table.
183    ///
184    map: HashTable<(u64, GroupIndexView)>,
185
186    /// The size of `map` in bytes
187    map_size: usize,
188
189    /// The lists for group indices with the same hash value
190    ///
191    /// It is possible that hash value collision exists,
192    /// and we will chain the `group indices` with same hash value
193    ///
194    /// The chained indices is like:
195    ///   `latest group index -> older group index -> even older group index -> ...`
196    group_index_lists: Vec<Vec<usize>>,
197
198    /// When emitting first n, we need to decrease/erase group indices in
199    /// `map` and `group_index_lists`.
200    ///
201    /// This buffer is used to temporarily store the remaining group indices in
202    /// a specific list in `group_index_lists`.
203    emit_group_index_list_buffer: Vec<usize>,
204
205    /// Buffers for `vectorized_append` and `vectorized_equal_to`
206    vectorized_operation_buffers: VectorizedOperationBuffers,
207
208    /// The actual group by values, stored column-wise. Compare from
209    /// the left to right, each column is stored as [`GroupColumn`].
210    ///
211    /// Performance tests showed that this design is faster than using the
212    /// more general purpose [`GroupValuesRows`]. See the ticket for details:
213    /// <https://github.com/apache/datafusion/pull/12269>
214    ///
215    /// [`GroupValuesRows`]: crate::aggregates::group_values::row::GroupValuesRows
216    group_values: Vec<Box<dyn GroupColumn>>,
217
218    /// reused buffer to store hashes
219    hashes_buffer: Vec<u64>,
220
221    /// Random state for creating hashes
222    random_state: RandomState,
223}
224
225/// Buffers to store intermediate results in `vectorized_append`
226/// and `vectorized_equal_to`, for reducing memory allocation
227#[derive(Default)]
228struct VectorizedOperationBuffers {
229    /// The `vectorized append` row indices buffer
230    append_row_indices: Vec<usize>,
231
232    /// The `vectorized_equal_to` row indices buffer
233    equal_to_row_indices: Vec<usize>,
234
235    /// The `vectorized_equal_to` group indices buffer
236    equal_to_group_indices: Vec<usize>,
237
238    /// The `vectorized_equal_to` result buffer
239    equal_to_results: Vec<bool>,
240
241    /// The buffer for storing row indices found not equal to
242    /// exist groups in `group_values` in `vectorized_equal_to`.
243    /// We will perform `scalarized_intern` for such rows.
244    remaining_row_indices: Vec<usize>,
245}
246
247impl VectorizedOperationBuffers {
248    fn clear(&mut self) {
249        self.append_row_indices.clear();
250        self.equal_to_row_indices.clear();
251        self.equal_to_group_indices.clear();
252        self.equal_to_results.clear();
253        self.remaining_row_indices.clear();
254    }
255}
256
257impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
258    // ========================================================================
259    // Initialization functions
260    // ========================================================================
261
262    /// Create a new instance of GroupValuesColumn if supported for the specified schema
263    pub fn try_new(schema: SchemaRef) -> Result<Self> {
264        let map = HashTable::with_capacity(0);
265        Ok(Self {
266            schema,
267            map,
268            group_index_lists: Vec::new(),
269            emit_group_index_list_buffer: Vec::new(),
270            vectorized_operation_buffers: VectorizedOperationBuffers::default(),
271            map_size: 0,
272            group_values: vec![],
273            hashes_buffer: Default::default(),
274            random_state: crate::aggregates::AGGREGATION_HASH_SEED,
275        })
276    }
277
278    // ========================================================================
279    // Scalarized intern
280    // ========================================================================
281
282    /// Scalarized intern
283    ///
284    /// This is used only for `streaming aggregation`, because `streaming aggregation`
285    /// depends on the order between `input rows` and their corresponding `group indices`.
286    ///
287    /// For example, assuming `input rows` in `cols` with 4 new rows
288    /// (not equal to `exist rows` in `group_values`, and need to create
289    /// new groups for them):
290    ///
291    /// ```text
292    ///   row1 (hash collision with the exist rows)
293    ///   row2
294    ///   row3 (hash collision with the exist rows)
295    ///   row4
296    /// ```
297    ///
298    /// # In `scalarized_intern`, their `group indices` will be
299    ///
300    /// ```text
301    ///   row1 --> 0
302    ///   row2 --> 1
303    ///   row3 --> 2
304    ///   row4 --> 3
305    /// ```
306    ///
307    /// `Group indices` order agrees with their input order, and the `streaming aggregation`
308    /// depends on this.
309    ///
310    /// # However In `vectorized_intern`, their `group indices` will be
311    ///
312    /// ```text
313    ///   row1 --> 2
314    ///   row2 --> 0
315    ///   row3 --> 3
316    ///   row4 --> 1
317    /// ```
318    ///
319    /// `Group indices` order are against with their input order, and this will lead to error
320    /// in `streaming aggregation`.
321    fn scalarized_intern(
322        &mut self,
323        cols: &[ArrayRef],
324        groups: &mut Vec<usize>,
325    ) -> Result<()> {
326        let n_rows = cols[0].len();
327
328        // tracks to which group each of the input rows belongs
329        groups.clear();
330
331        // 1.1 Calculate the group keys for the group values
332        let batch_hashes = &mut self.hashes_buffer;
333        batch_hashes.clear();
334        batch_hashes.resize(n_rows, 0);
335        create_hashes(cols, &self.random_state, batch_hashes)?;
336
337        for (row, &target_hash) in batch_hashes.iter().enumerate() {
338            let entry = self
339                .map
340                .find_mut(target_hash, |(exist_hash, group_idx_view)| {
341                    // It is ensured to be inlined in `scalarized_intern`
342                    debug_assert!(!group_idx_view.is_non_inlined());
343
344                    // Somewhat surprisingly, this closure can be called even if the
345                    // hash doesn't match, so check the hash first with an integer
346                    // comparison first avoid the more expensive comparison with
347                    // group value. https://github.com/apache/datafusion/pull/11718
348                    if target_hash != *exist_hash {
349                        return false;
350                    }
351
352                    fn check_row_equal(
353                        array_row: &dyn GroupColumn,
354                        lhs_row: usize,
355                        array: &ArrayRef,
356                        rhs_row: usize,
357                    ) -> bool {
358                        array_row.equal_to(lhs_row, array, rhs_row)
359                    }
360
361                    for (i, group_val) in self.group_values.iter().enumerate() {
362                        if !check_row_equal(
363                            group_val.as_ref(),
364                            group_idx_view.value() as usize,
365                            &cols[i],
366                            row,
367                        ) {
368                            return false;
369                        }
370                    }
371
372                    true
373                });
374
375            let group_idx = match entry {
376                // Existing group_index for this group value
377                Some((_hash, group_idx_view)) => group_idx_view.value() as usize,
378                //  1.2 Need to create new entry for the group
379                None => {
380                    // Add new entry to aggr_state and save newly created index
381                    // let group_idx = group_values.num_rows();
382                    // group_values.push(group_rows.row(row));
383
384                    let mut checklen = 0;
385                    let group_idx = self.group_values[0].len();
386                    for (i, group_value) in self.group_values.iter_mut().enumerate() {
387                        group_value.append_val(&cols[i], row)?;
388                        let len = group_value.len();
389                        if i == 0 {
390                            checklen = len;
391                        } else {
392                            debug_assert_eq!(checklen, len);
393                        }
394                    }
395
396                    // for hasher function, use precomputed hash value
397                    self.map.insert_accounted(
398                        (target_hash, GroupIndexView::new_inlined(group_idx as u64)),
399                        |(hash, _group_index)| *hash,
400                        &mut self.map_size,
401                    );
402                    group_idx
403                }
404            };
405            groups.push(group_idx);
406        }
407
408        Ok(())
409    }
410
411    // ========================================================================
412    // Vectorized intern
413    // ========================================================================
414
415    /// Vectorized intern
416    ///
417    /// This is used in `non-streaming aggregation` without requiring the order between
418    /// rows in `cols` and corresponding groups in `group_values`.
419    ///
420    /// The vectorized approach can offer higher performance for avoiding row by row
421    /// downcast for `cols` and being able to implement even more optimizations(like simd).
422    fn vectorized_intern(
423        &mut self,
424        cols: &[ArrayRef],
425        groups: &mut Vec<usize>,
426    ) -> Result<()> {
427        let n_rows = cols[0].len();
428
429        // tracks to which group each of the input rows belongs
430        groups.clear();
431        groups.resize(n_rows, usize::MAX);
432
433        let mut batch_hashes = mem::take(&mut self.hashes_buffer);
434        batch_hashes.clear();
435        batch_hashes.resize(n_rows, 0);
436        create_hashes(cols, &self.random_state, &mut batch_hashes)?;
437
438        // General steps for one round `vectorized equal_to & append`:
439        //   1. Collect vectorized context by checking hash values of `cols` in `map`,
440        //      mainly fill `vectorized_append_row_indices`, `vectorized_equal_to_row_indices`
441        //      and `vectorized_equal_to_group_indices`
442        //
443        //   2. Perform `vectorized_append` for `vectorized_append_row_indices`.
444        //     `vectorized_append` must be performed before `vectorized_equal_to`,
445        //      because some `group indices` in `vectorized_equal_to_group_indices`
446        //      maybe still point to no actual values in `group_values` before performing append.
447        //
448        //   3. Perform `vectorized_equal_to` for `vectorized_equal_to_row_indices`
449        //      and `vectorized_equal_to_group_indices`. If found some rows in input `cols`
450        //      not equal to `exist rows` in `group_values`, place them in `remaining_row_indices`
451        //      and perform `scalarized_intern_remaining` for them similar as `scalarized_intern`
452        //      after.
453        //
454        //   4. Perform `scalarized_intern_remaining` for rows mentioned above, about in what situation
455        //      we will process this can see the comments of `scalarized_intern_remaining`.
456        //
457
458        // 1. Collect vectorized context by checking hash values of `cols` in `map`
459        self.collect_vectorized_process_context(&batch_hashes, groups);
460
461        // 2. Perform `vectorized_append`
462        self.vectorized_append(cols)?;
463
464        // 3. Perform `vectorized_equal_to`
465        self.vectorized_equal_to(cols, groups);
466
467        // 4. Perform scalarized inter for remaining rows
468        // (about remaining rows, can see comments for `remaining_row_indices`)
469        self.scalarized_intern_remaining(cols, &batch_hashes, groups)?;
470
471        self.hashes_buffer = batch_hashes;
472
473        Ok(())
474    }
475
476    /// Collect vectorized context by checking hash values of `cols` in `map`
477    ///
478    /// 1. If bucket not found
479    ///   - Build and insert the `new inlined group index view`
480    ///     and its hash value to `map`
481    ///   - Add row index to `vectorized_append_row_indices`
482    ///   - Set group index to row in `groups`
483    ///
484    /// 2. bucket found
485    ///   - Add row index to `vectorized_equal_to_row_indices`
486    ///   - Check if the `group index view` is `inlined` or `non_inlined`:
487    ///     If it is inlined, add to `vectorized_equal_to_group_indices` directly.
488    ///     Otherwise get all group indices from `group_index_lists`, and add them.
489    fn collect_vectorized_process_context(
490        &mut self,
491        batch_hashes: &[u64],
492        groups: &mut [usize],
493    ) {
494        self.vectorized_operation_buffers.append_row_indices.clear();
495        self.vectorized_operation_buffers
496            .equal_to_row_indices
497            .clear();
498        self.vectorized_operation_buffers
499            .equal_to_group_indices
500            .clear();
501
502        let mut group_values_len = self.group_values[0].len();
503        for (row, &target_hash) in batch_hashes.iter().enumerate() {
504            let entry = self
505                .map
506                .find(target_hash, |(exist_hash, _)| target_hash == *exist_hash);
507
508            let Some((_, group_index_view)) = entry else {
509                // 1. Bucket not found case
510                // Build `new inlined group index view`
511                let current_group_idx = group_values_len;
512                let group_index_view =
513                    GroupIndexView::new_inlined(current_group_idx as u64);
514
515                // Insert the `group index view` and its hash into `map`
516                // for hasher function, use precomputed hash value
517                self.map.insert_accounted(
518                    (target_hash, group_index_view),
519                    |(hash, _)| *hash,
520                    &mut self.map_size,
521                );
522
523                // Add row index to `vectorized_append_row_indices`
524                self.vectorized_operation_buffers
525                    .append_row_indices
526                    .push(row);
527
528                // Set group index to row in `groups`
529                groups[row] = current_group_idx;
530
531                group_values_len += 1;
532                continue;
533            };
534
535            // 2. bucket found
536            // Check if the `group index view` is `inlined` or `non_inlined`
537            if group_index_view.is_non_inlined() {
538                // Non-inlined case, the value of view is offset in `group_index_lists`.
539                // We use it to get `group_index_list`, and add related `rows` and `group_indices`
540                // into `vectorized_equal_to_row_indices` and `vectorized_equal_to_group_indices`.
541                let list_offset = group_index_view.value() as usize;
542                let group_index_list = &self.group_index_lists[list_offset];
543                for &group_index in group_index_list {
544                    self.vectorized_operation_buffers
545                        .equal_to_row_indices
546                        .push(row);
547                    self.vectorized_operation_buffers
548                        .equal_to_group_indices
549                        .push(group_index);
550                }
551            } else {
552                let group_index = group_index_view.value() as usize;
553                self.vectorized_operation_buffers
554                    .equal_to_row_indices
555                    .push(row);
556                self.vectorized_operation_buffers
557                    .equal_to_group_indices
558                    .push(group_index);
559            }
560        }
561    }
562
563    /// Perform `vectorized_append`` for `rows` in `vectorized_append_row_indices`
564    fn vectorized_append(&mut self, cols: &[ArrayRef]) -> Result<()> {
565        if self
566            .vectorized_operation_buffers
567            .append_row_indices
568            .is_empty()
569        {
570            return Ok(());
571        }
572
573        let iter = self.group_values.iter_mut().zip(cols.iter());
574        for (group_column, col) in iter {
575            group_column.vectorized_append(
576                col,
577                &self.vectorized_operation_buffers.append_row_indices,
578            )?;
579        }
580
581        Ok(())
582    }
583
584    /// Perform `vectorized_equal_to`
585    ///
586    /// 1. Perform `vectorized_equal_to` for `rows` in `vectorized_equal_to_group_indices`
587    ///    and `group_indices` in `vectorized_equal_to_group_indices`.
588    ///
589    /// 2. Check `equal_to_results`:
590    ///
591    ///    If found equal to `rows`, set the `group_indices` to `rows` in `groups`.
592    ///
593    ///    If found not equal to `row`s, just add them to `scalarized_indices`,
594    ///    and perform `scalarized_intern` for them after.
595    ///    Usually, such `rows` having same hash but different value with `exists rows`
596    ///    are very few.
597    fn vectorized_equal_to(&mut self, cols: &[ArrayRef], groups: &mut [usize]) {
598        assert_eq!(
599            self.vectorized_operation_buffers
600                .equal_to_group_indices
601                .len(),
602            self.vectorized_operation_buffers.equal_to_row_indices.len()
603        );
604
605        self.vectorized_operation_buffers
606            .remaining_row_indices
607            .clear();
608
609        if self
610            .vectorized_operation_buffers
611            .equal_to_group_indices
612            .is_empty()
613        {
614            return;
615        }
616
617        // 1. Perform `vectorized_equal_to` for `rows` in `vectorized_equal_to_group_indices`
618        //    and `group_indices` in `vectorized_equal_to_group_indices`
619        let mut equal_to_results =
620            mem::take(&mut self.vectorized_operation_buffers.equal_to_results);
621        equal_to_results.clear();
622        equal_to_results.resize(
623            self.vectorized_operation_buffers
624                .equal_to_group_indices
625                .len(),
626            true,
627        );
628
629        for (col_idx, group_col) in self.group_values.iter().enumerate() {
630            group_col.vectorized_equal_to(
631                &self.vectorized_operation_buffers.equal_to_group_indices,
632                &cols[col_idx],
633                &self.vectorized_operation_buffers.equal_to_row_indices,
634                &mut equal_to_results,
635            );
636        }
637
638        // 2. Check `equal_to_results`, if found not equal to `row`s, just add them
639        //    to `scalarized_indices`, and perform `scalarized_intern` for them after.
640        let mut current_row_equal_to_result = false;
641        for (idx, &row) in self
642            .vectorized_operation_buffers
643            .equal_to_row_indices
644            .iter()
645            .enumerate()
646        {
647            let equal_to_result = equal_to_results[idx];
648
649            // Equal to case, set the `group_indices` to `rows` in `groups`
650            if equal_to_result {
651                groups[row] =
652                    self.vectorized_operation_buffers.equal_to_group_indices[idx];
653            }
654            current_row_equal_to_result |= equal_to_result;
655
656            // Look forward next one row to check if have checked all results
657            // of current row
658            let next_row = self
659                .vectorized_operation_buffers
660                .equal_to_row_indices
661                .get(idx + 1)
662                .unwrap_or(&usize::MAX);
663
664            // Have checked all results of current row, check the total result
665            if row != *next_row {
666                // Not equal to case, add `row` to `scalarized_indices`
667                if !current_row_equal_to_result {
668                    self.vectorized_operation_buffers
669                        .remaining_row_indices
670                        .push(row);
671                }
672
673                // Init the total result for checking next row
674                current_row_equal_to_result = false;
675            }
676        }
677
678        self.vectorized_operation_buffers.equal_to_results = equal_to_results;
679    }
680
681    /// It is possible that some `input rows` have the same
682    /// hash values with the `exist rows`, but have the different
683    /// actual values the exists.
684    ///
685    /// We can found them in `vectorized_equal_to`, and put them
686    /// into `scalarized_indices`. And for these `input rows`,
687    /// we will perform the `scalarized_intern` similar as what in
688    /// [`GroupValuesColumn`].
689    ///
690    /// This design can make the process simple and still efficient enough:
691    ///
692    /// # About making the process simple
693    ///
694    /// Some corner cases become really easy to solve, like following cases:
695    ///
696    /// ```text
697    ///   input row1 (same hash value with exist rows, but value different)
698    ///   input row1
699    ///   ...
700    ///   input row1
701    /// ```
702    ///
703    /// After performing `vectorized_equal_to`, we will found multiple `input rows`
704    /// not equal to the `exist rows`. However such `input rows` are repeated, only
705    /// one new group should be create for them.
706    ///
707    /// If we don't fallback to `scalarized_intern`, it is really hard for us to
708    /// distinguish the such `repeated rows` in `input rows`. And if we just fallback,
709    /// it is really easy to solve, and the performance is at least not worse than origin.
710    ///
711    /// # About performance
712    ///
713    /// The hash collision may be not frequent, so the fallback will indeed hardly happen.
714    /// In most situations, `scalarized_indices` will found to be empty after finishing to
715    /// preform `vectorized_equal_to`.
716    fn scalarized_intern_remaining(
717        &mut self,
718        cols: &[ArrayRef],
719        batch_hashes: &[u64],
720        groups: &mut [usize],
721    ) -> Result<()> {
722        if self
723            .vectorized_operation_buffers
724            .remaining_row_indices
725            .is_empty()
726        {
727            return Ok(());
728        }
729
730        let mut map = mem::take(&mut self.map);
731
732        for &row in &self.vectorized_operation_buffers.remaining_row_indices {
733            let target_hash = batch_hashes[row];
734            let entry = map.find_mut(target_hash, |(exist_hash, _)| {
735                // Somewhat surprisingly, this closure can be called even if the
736                // hash doesn't match, so check the hash first with an integer
737                // comparison first avoid the more expensive comparison with
738                // group value. https://github.com/apache/datafusion/pull/11718
739                target_hash == *exist_hash
740            });
741
742            // Only `rows` having the same hash value with `exist rows` but different value
743            // will be process in `scalarized_intern`.
744            // So related `buckets` in `map` is ensured to be `Some`.
745            let Some((_, group_index_view)) = entry else {
746                unreachable!()
747            };
748
749            // Perform scalarized equal to
750            if self.scalarized_equal_to_remaining(group_index_view, cols, row, groups) {
751                // Found the row actually exists in group values,
752                // don't need to create new group for it.
753                continue;
754            }
755
756            // Insert the `row` to `group_values` before checking `next row`
757            let group_idx = self.group_values[0].len();
758            let mut checklen = 0;
759            for (i, group_value) in self.group_values.iter_mut().enumerate() {
760                group_value.append_val(&cols[i], row)?;
761                let len = group_value.len();
762                if i == 0 {
763                    checklen = len;
764                } else {
765                    debug_assert_eq!(checklen, len);
766                }
767            }
768
769            // Check if the `view` is `inlined` or `non-inlined`
770            if group_index_view.is_non_inlined() {
771                // Non-inlined case, get `group_index_list` from `group_index_lists`,
772                // then add the new `group` with the same hash values into it.
773                let list_offset = group_index_view.value() as usize;
774                let group_index_list = &mut self.group_index_lists[list_offset];
775                group_index_list.push(group_idx);
776            } else {
777                // Inlined case
778                let list_offset = self.group_index_lists.len();
779
780                // Create new `group_index_list` including
781                // `exist group index` + `new group index`.
782                // Add new `group_index_list` into ``group_index_lists`.
783                let exist_group_index = group_index_view.value() as usize;
784                let new_group_index_list = vec![exist_group_index, group_idx];
785                self.group_index_lists.push(new_group_index_list);
786
787                // Update the `group_index_view` to non-inlined
788                let new_group_index_view =
789                    GroupIndexView::new_non_inlined(list_offset as u64);
790                *group_index_view = new_group_index_view;
791            }
792
793            groups[row] = group_idx;
794        }
795
796        self.map = map;
797        Ok(())
798    }
799
800    fn scalarized_equal_to_remaining(
801        &self,
802        group_index_view: &GroupIndexView,
803        cols: &[ArrayRef],
804        row: usize,
805        groups: &mut [usize],
806    ) -> bool {
807        // Check if this row exists in `group_values`
808        fn check_row_equal(
809            array_row: &dyn GroupColumn,
810            lhs_row: usize,
811            array: &ArrayRef,
812            rhs_row: usize,
813        ) -> bool {
814            array_row.equal_to(lhs_row, array, rhs_row)
815        }
816
817        if group_index_view.is_non_inlined() {
818            let list_offset = group_index_view.value() as usize;
819            let group_index_list = &self.group_index_lists[list_offset];
820
821            for &group_idx in group_index_list {
822                let mut check_result = true;
823                for (i, group_val) in self.group_values.iter().enumerate() {
824                    if !check_row_equal(group_val.as_ref(), group_idx, &cols[i], row) {
825                        check_result = false;
826                        break;
827                    }
828                }
829
830                if check_result {
831                    groups[row] = group_idx;
832                    return true;
833                }
834            }
835
836            // All groups unmatched, return false result
837            false
838        } else {
839            let group_idx = group_index_view.value() as usize;
840            for (i, group_val) in self.group_values.iter().enumerate() {
841                if !check_row_equal(group_val.as_ref(), group_idx, &cols[i], row) {
842                    return false;
843                }
844            }
845
846            groups[row] = group_idx;
847            true
848        }
849    }
850
851    /// Return group indices of the hash, also if its `group_index_view` is non-inlined
852    #[cfg(test)]
853    fn get_indices_by_hash(&self, hash: u64) -> Option<(Vec<usize>, GroupIndexView)> {
854        let entry = self.map.find(hash, |(exist_hash, _)| hash == *exist_hash);
855
856        match entry {
857            Some((_, group_index_view)) => {
858                if group_index_view.is_non_inlined() {
859                    let list_offset = group_index_view.value() as usize;
860                    Some((
861                        self.group_index_lists[list_offset].clone(),
862                        *group_index_view,
863                    ))
864                } else {
865                    let group_index = group_index_view.value() as usize;
866                    Some((vec![group_index], *group_index_view))
867                }
868            }
869            None => None,
870        }
871    }
872}
873
874/// instantiates a [`PrimitiveGroupValueBuilder`] and pushes it into $v
875///
876/// Arguments:
877/// `$v`: the vector to push the new builder into
878/// `$nullable`: whether the input can contains nulls
879/// `$t`: the primitive type of the builder
880macro_rules! instantiate_primitive {
881    ($v:expr, $nullable:expr, $t:ty, $data_type:ident) => {
882        if $nullable {
883            let b = PrimitiveGroupValueBuilder::<$t, true>::new($data_type.to_owned());
884            $v.push(Box::new(b) as _)
885        } else {
886            let b = PrimitiveGroupValueBuilder::<$t, false>::new($data_type.to_owned());
887            $v.push(Box::new(b) as _)
888        }
889    };
890}
891
892impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING> {
893    fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
894        if self.group_values.is_empty() {
895            let mut v = Vec::with_capacity(cols.len());
896
897            for f in self.schema.fields().iter() {
898                let nullable = f.is_nullable();
899                let data_type = f.data_type();
900                match data_type {
901                    &DataType::Int8 => {
902                        instantiate_primitive!(v, nullable, Int8Type, data_type)
903                    }
904                    &DataType::Int16 => {
905                        instantiate_primitive!(v, nullable, Int16Type, data_type)
906                    }
907                    &DataType::Int32 => {
908                        instantiate_primitive!(v, nullable, Int32Type, data_type)
909                    }
910                    &DataType::Int64 => {
911                        instantiate_primitive!(v, nullable, Int64Type, data_type)
912                    }
913                    &DataType::UInt8 => {
914                        instantiate_primitive!(v, nullable, UInt8Type, data_type)
915                    }
916                    &DataType::UInt16 => {
917                        instantiate_primitive!(v, nullable, UInt16Type, data_type)
918                    }
919                    &DataType::UInt32 => {
920                        instantiate_primitive!(v, nullable, UInt32Type, data_type)
921                    }
922                    &DataType::UInt64 => {
923                        instantiate_primitive!(v, nullable, UInt64Type, data_type)
924                    }
925                    &DataType::Float32 => {
926                        instantiate_primitive!(v, nullable, Float32Type, data_type)
927                    }
928                    &DataType::Float64 => {
929                        instantiate_primitive!(v, nullable, Float64Type, data_type)
930                    }
931                    &DataType::Date32 => {
932                        instantiate_primitive!(v, nullable, Date32Type, data_type)
933                    }
934                    &DataType::Date64 => {
935                        instantiate_primitive!(v, nullable, Date64Type, data_type)
936                    }
937                    &DataType::Time32(t) => match t {
938                        TimeUnit::Second => {
939                            instantiate_primitive!(
940                                v,
941                                nullable,
942                                Time32SecondType,
943                                data_type
944                            )
945                        }
946                        TimeUnit::Millisecond => {
947                            instantiate_primitive!(
948                                v,
949                                nullable,
950                                Time32MillisecondType,
951                                data_type
952                            )
953                        }
954                        _ => {}
955                    },
956                    &DataType::Time64(t) => match t {
957                        TimeUnit::Microsecond => {
958                            instantiate_primitive!(
959                                v,
960                                nullable,
961                                Time64MicrosecondType,
962                                data_type
963                            )
964                        }
965                        TimeUnit::Nanosecond => {
966                            instantiate_primitive!(
967                                v,
968                                nullable,
969                                Time64NanosecondType,
970                                data_type
971                            )
972                        }
973                        _ => {}
974                    },
975                    &DataType::Timestamp(t, _) => match t {
976                        TimeUnit::Second => {
977                            instantiate_primitive!(
978                                v,
979                                nullable,
980                                TimestampSecondType,
981                                data_type
982                            )
983                        }
984                        TimeUnit::Millisecond => {
985                            instantiate_primitive!(
986                                v,
987                                nullable,
988                                TimestampMillisecondType,
989                                data_type
990                            )
991                        }
992                        TimeUnit::Microsecond => {
993                            instantiate_primitive!(
994                                v,
995                                nullable,
996                                TimestampMicrosecondType,
997                                data_type
998                            )
999                        }
1000                        TimeUnit::Nanosecond => {
1001                            instantiate_primitive!(
1002                                v,
1003                                nullable,
1004                                TimestampNanosecondType,
1005                                data_type
1006                            )
1007                        }
1008                    },
1009                    &DataType::Decimal128(_, _) => {
1010                        instantiate_primitive! {
1011                            v,
1012                            nullable,
1013                            Decimal128Type,
1014                            data_type
1015                        }
1016                    }
1017                    &DataType::Utf8 => {
1018                        let b = ByteGroupValueBuilder::<i32>::new(OutputType::Utf8);
1019                        v.push(Box::new(b) as _)
1020                    }
1021                    &DataType::LargeUtf8 => {
1022                        let b = ByteGroupValueBuilder::<i64>::new(OutputType::Utf8);
1023                        v.push(Box::new(b) as _)
1024                    }
1025                    &DataType::Binary => {
1026                        let b = ByteGroupValueBuilder::<i32>::new(OutputType::Binary);
1027                        v.push(Box::new(b) as _)
1028                    }
1029                    &DataType::LargeBinary => {
1030                        let b = ByteGroupValueBuilder::<i64>::new(OutputType::Binary);
1031                        v.push(Box::new(b) as _)
1032                    }
1033                    &DataType::Utf8View => {
1034                        let b = ByteViewGroupValueBuilder::<StringViewType>::new();
1035                        v.push(Box::new(b) as _)
1036                    }
1037                    &DataType::BinaryView => {
1038                        let b = ByteViewGroupValueBuilder::<BinaryViewType>::new();
1039                        v.push(Box::new(b) as _)
1040                    }
1041                    &DataType::Boolean => {
1042                        if nullable {
1043                            let b = BooleanGroupValueBuilder::<true>::new();
1044                            v.push(Box::new(b) as _)
1045                        } else {
1046                            let b = BooleanGroupValueBuilder::<false>::new();
1047                            v.push(Box::new(b) as _)
1048                        }
1049                    }
1050                    dt => {
1051                        return not_impl_err!("{dt} not supported in GroupValuesColumn")
1052                    }
1053                }
1054            }
1055            self.group_values = v;
1056        }
1057
1058        if !STREAMING {
1059            self.vectorized_intern(cols, groups)
1060        } else {
1061            self.scalarized_intern(cols, groups)
1062        }
1063    }
1064
1065    fn size(&self) -> usize {
1066        let group_values_size: usize = self.group_values.iter().map(|v| v.size()).sum();
1067        group_values_size + self.map_size + self.hashes_buffer.allocated_size()
1068    }
1069
1070    fn is_empty(&self) -> bool {
1071        self.len() == 0
1072    }
1073
1074    fn len(&self) -> usize {
1075        if self.group_values.is_empty() {
1076            return 0;
1077        }
1078
1079        self.group_values[0].len()
1080    }
1081
1082    fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
1083        let mut output = match emit_to {
1084            EmitTo::All => {
1085                let group_values = mem::take(&mut self.group_values);
1086                debug_assert!(self.group_values.is_empty());
1087
1088                group_values
1089                    .into_iter()
1090                    .map(|v| v.build())
1091                    .collect::<Vec<_>>()
1092            }
1093            EmitTo::First(n) => {
1094                let output = self
1095                    .group_values
1096                    .iter_mut()
1097                    .map(|v| v.take_n(n))
1098                    .collect::<Vec<_>>();
1099                let mut next_new_list_offset = 0;
1100
1101                self.map.retain(|(_exist_hash, group_idx_view)| {
1102                    // In non-streaming case, we need to check if the `group index view`
1103                    // is `inlined` or `non-inlined`
1104                    if !STREAMING && group_idx_view.is_non_inlined() {
1105                        // Non-inlined case
1106                        // We take `group_index_list` from `old_group_index_lists`
1107
1108                        // list_offset is incrementally
1109                        self.emit_group_index_list_buffer.clear();
1110                        let list_offset = group_idx_view.value() as usize;
1111                        for group_index in self.group_index_lists[list_offset].iter() {
1112                            if let Some(remaining) = group_index.checked_sub(n) {
1113                                self.emit_group_index_list_buffer.push(remaining);
1114                            }
1115                        }
1116
1117                        // The possible results:
1118                        //   - `new_group_index_list` is empty, we should erase this bucket
1119                        //   - only one value in `new_group_index_list`, switch the `view` to `inlined`
1120                        //   - still multiple values in `new_group_index_list`, build and set the new `unlined view`
1121                        if self.emit_group_index_list_buffer.is_empty() {
1122                            false
1123                        } else if self.emit_group_index_list_buffer.len() == 1 {
1124                            let group_index =
1125                                self.emit_group_index_list_buffer.first().unwrap();
1126                            *group_idx_view =
1127                                GroupIndexView::new_inlined(*group_index as u64);
1128                            true
1129                        } else {
1130                            let group_index_list =
1131                                &mut self.group_index_lists[next_new_list_offset];
1132                            group_index_list.clear();
1133                            group_index_list
1134                                .extend(self.emit_group_index_list_buffer.iter());
1135                            *group_idx_view = GroupIndexView::new_non_inlined(
1136                                next_new_list_offset as u64,
1137                            );
1138                            next_new_list_offset += 1;
1139                            true
1140                        }
1141                    } else {
1142                        // In `streaming case`, the `group index view` is ensured to be `inlined`
1143                        debug_assert!(!group_idx_view.is_non_inlined());
1144
1145                        // Inlined case, we just decrement group index by n)
1146                        let group_index = group_idx_view.value() as usize;
1147                        match group_index.checked_sub(n) {
1148                            // Group index was >= n, shift value down
1149                            Some(sub) => {
1150                                *group_idx_view = GroupIndexView::new_inlined(sub as u64);
1151                                true
1152                            }
1153                            // Group index was < n, so remove from table
1154                            None => false,
1155                        }
1156                    }
1157                });
1158
1159                if !STREAMING {
1160                    self.group_index_lists.truncate(next_new_list_offset);
1161                }
1162
1163                output
1164            }
1165        };
1166
1167        // TODO: Materialize dictionaries in group keys (#7647)
1168        for (field, array) in self.schema.fields.iter().zip(&mut output) {
1169            let expected = field.data_type();
1170            if let DataType::Dictionary(_, v) = expected {
1171                let actual = array.data_type();
1172                if v.as_ref() != actual {
1173                    return Err(internal_datafusion_err!(
1174                        "Converted group rows expected dictionary of {v} got {actual}"
1175                    ));
1176                }
1177                *array = cast(array.as_ref(), expected)?;
1178            }
1179        }
1180
1181        Ok(output)
1182    }
1183
1184    fn clear_shrink(&mut self, batch: &RecordBatch) {
1185        let count = batch.num_rows();
1186        self.group_values.clear();
1187        self.map.clear();
1188        self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared
1189        self.map_size = self.map.capacity() * size_of::<(u64, usize)>();
1190        self.hashes_buffer.clear();
1191        self.hashes_buffer.shrink_to(count);
1192
1193        // Such structures are only used in `non-streaming` case
1194        if !STREAMING {
1195            self.group_index_lists.clear();
1196            self.emit_group_index_list_buffer.clear();
1197            self.vectorized_operation_buffers.clear();
1198        }
1199    }
1200}
1201
1202/// Returns true if [`GroupValuesColumn`] supported for the specified schema
1203pub fn supported_schema(schema: &Schema) -> bool {
1204    schema
1205        .fields()
1206        .iter()
1207        .map(|f| f.data_type())
1208        .all(supported_type)
1209}
1210
1211/// Returns true if the specified data type is supported by [`GroupValuesColumn`]
1212///
1213/// In order to be supported, there must be a specialized implementation of
1214/// [`GroupColumn`] for the data type, instantiated in [`GroupValuesColumn::intern`]
1215fn supported_type(data_type: &DataType) -> bool {
1216    matches!(
1217        *data_type,
1218        DataType::Int8
1219            | DataType::Int16
1220            | DataType::Int32
1221            | DataType::Int64
1222            | DataType::UInt8
1223            | DataType::UInt16
1224            | DataType::UInt32
1225            | DataType::UInt64
1226            | DataType::Float32
1227            | DataType::Float64
1228            | DataType::Decimal128(_, _)
1229            | DataType::Utf8
1230            | DataType::LargeUtf8
1231            | DataType::Binary
1232            | DataType::LargeBinary
1233            | DataType::Date32
1234            | DataType::Date64
1235            | DataType::Time32(_)
1236            | DataType::Timestamp(_, _)
1237            | DataType::Utf8View
1238            | DataType::BinaryView
1239            | DataType::Boolean
1240    )
1241}
1242
1243///Shows how many `null`s there are in an array
1244enum Nulls {
1245    /// All array items are `null`s
1246    All,
1247    /// There are both `null`s and non-`null`s in the array items
1248    Some,
1249    /// There are no `null`s in the array items
1250    None,
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255    use std::{collections::HashMap, sync::Arc};
1256
1257    use arrow::array::{ArrayRef, Int64Array, RecordBatch, StringArray, StringViewArray};
1258    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1259    use arrow::{compute::concat_batches, util::pretty::pretty_format_batches};
1260    use datafusion_common::utils::proxy::HashTableAllocExt;
1261    use datafusion_expr::EmitTo;
1262
1263    use crate::aggregates::group_values::{
1264        multi_group_by::GroupValuesColumn, GroupValues,
1265    };
1266
1267    use super::GroupIndexView;
1268
1269    #[test]
1270    fn test_intern_for_vectorized_group_values() {
1271        let data_set = VectorizedTestDataSet::new();
1272        let mut group_values =
1273            GroupValuesColumn::<false>::try_new(data_set.schema()).unwrap();
1274
1275        data_set.load_to_group_values(&mut group_values);
1276        let actual_batch = group_values.emit(EmitTo::All).unwrap();
1277        let actual_batch = RecordBatch::try_new(data_set.schema(), actual_batch).unwrap();
1278
1279        check_result(&actual_batch, &data_set.expected_batch);
1280    }
1281
1282    #[test]
1283    fn test_emit_first_n_for_vectorized_group_values() {
1284        let data_set = VectorizedTestDataSet::new();
1285        let mut group_values =
1286            GroupValuesColumn::<false>::try_new(data_set.schema()).unwrap();
1287
1288        // 1~num_rows times to emit the groups
1289        let num_rows = data_set.expected_batch.num_rows();
1290        let schema = data_set.schema();
1291        for times_to_take in 1..=num_rows {
1292            // Write data after emitting
1293            data_set.load_to_group_values(&mut group_values);
1294
1295            // Emit `times_to_take` times, collect and concat the sub-results to total result,
1296            // then check it
1297            let suggest_num_emit = data_set.expected_batch.num_rows() / times_to_take;
1298            let mut num_remaining_rows = num_rows;
1299            let mut actual_sub_batches = Vec::new();
1300
1301            for nth_time in 0..times_to_take {
1302                let num_emit = if nth_time == times_to_take - 1 {
1303                    num_remaining_rows
1304                } else {
1305                    suggest_num_emit
1306                };
1307
1308                let sub_batch = group_values.emit(EmitTo::First(num_emit)).unwrap();
1309                let sub_batch =
1310                    RecordBatch::try_new(Arc::clone(&schema), sub_batch).unwrap();
1311                actual_sub_batches.push(sub_batch);
1312
1313                num_remaining_rows -= num_emit;
1314            }
1315            assert!(num_remaining_rows == 0);
1316
1317            let actual_batch = concat_batches(&schema, &actual_sub_batches).unwrap();
1318            check_result(&actual_batch, &data_set.expected_batch);
1319        }
1320    }
1321
1322    #[test]
1323    fn test_hashtable_modifying_in_emit_first_n() {
1324        // Situations should be covered:
1325        //   1. Erase inlined group index view
1326        //   2. Erase whole non-inlined group index view
1327        //   3. Erase + decrease group indices in non-inlined group index view
1328        //      + view still non-inlined after decreasing
1329        //   4. Erase + decrease group indices in non-inlined group index view
1330        //      + view switch to inlined after decreasing
1331        //   5. Only decrease group index in inlined group index view
1332        //   6. Only decrease group indices in non-inlined group index view
1333        //   7. Erase all things
1334
1335        let field = Field::new_list_field(DataType::Int32, true);
1336        let schema = Arc::new(Schema::new_with_metadata(vec![field], HashMap::new()));
1337        let mut group_values = GroupValuesColumn::<false>::try_new(schema).unwrap();
1338
1339        // Insert group index views and check if success to insert
1340        insert_inline_group_index_view(&mut group_values, 0, 0);
1341        insert_non_inline_group_index_view(&mut group_values, 1, vec![1, 2]);
1342        insert_non_inline_group_index_view(&mut group_values, 2, vec![3, 4, 5]);
1343        insert_inline_group_index_view(&mut group_values, 3, 6);
1344        insert_non_inline_group_index_view(&mut group_values, 4, vec![7, 8]);
1345        insert_non_inline_group_index_view(&mut group_values, 5, vec![9, 10, 11]);
1346
1347        assert_eq!(
1348            group_values.get_indices_by_hash(0).unwrap(),
1349            (vec![0], GroupIndexView::new_inlined(0))
1350        );
1351        assert_eq!(
1352            group_values.get_indices_by_hash(1).unwrap(),
1353            (vec![1, 2], GroupIndexView::new_non_inlined(0))
1354        );
1355        assert_eq!(
1356            group_values.get_indices_by_hash(2).unwrap(),
1357            (vec![3, 4, 5], GroupIndexView::new_non_inlined(1))
1358        );
1359        assert_eq!(
1360            group_values.get_indices_by_hash(3).unwrap(),
1361            (vec![6], GroupIndexView::new_inlined(6))
1362        );
1363        assert_eq!(
1364            group_values.get_indices_by_hash(4).unwrap(),
1365            (vec![7, 8], GroupIndexView::new_non_inlined(2))
1366        );
1367        assert_eq!(
1368            group_values.get_indices_by_hash(5).unwrap(),
1369            (vec![9, 10, 11], GroupIndexView::new_non_inlined(3))
1370        );
1371        assert_eq!(group_values.map.len(), 6);
1372
1373        // Emit first 4 to test cases 1~3, 5~6
1374        let _ = group_values.emit(EmitTo::First(4)).unwrap();
1375        assert!(group_values.get_indices_by_hash(0).is_none());
1376        assert!(group_values.get_indices_by_hash(1).is_none());
1377        assert_eq!(
1378            group_values.get_indices_by_hash(2).unwrap(),
1379            (vec![0, 1], GroupIndexView::new_non_inlined(0))
1380        );
1381        assert_eq!(
1382            group_values.get_indices_by_hash(3).unwrap(),
1383            (vec![2], GroupIndexView::new_inlined(2))
1384        );
1385        assert_eq!(
1386            group_values.get_indices_by_hash(4).unwrap(),
1387            (vec![3, 4], GroupIndexView::new_non_inlined(1))
1388        );
1389        assert_eq!(
1390            group_values.get_indices_by_hash(5).unwrap(),
1391            (vec![5, 6, 7], GroupIndexView::new_non_inlined(2))
1392        );
1393        assert_eq!(group_values.map.len(), 4);
1394
1395        // Emit first 1 to test case 4, and cases 5~6 again
1396        let _ = group_values.emit(EmitTo::First(1)).unwrap();
1397        assert_eq!(
1398            group_values.get_indices_by_hash(2).unwrap(),
1399            (vec![0], GroupIndexView::new_inlined(0))
1400        );
1401        assert_eq!(
1402            group_values.get_indices_by_hash(3).unwrap(),
1403            (vec![1], GroupIndexView::new_inlined(1))
1404        );
1405        assert_eq!(
1406            group_values.get_indices_by_hash(4).unwrap(),
1407            (vec![2, 3], GroupIndexView::new_non_inlined(0))
1408        );
1409        assert_eq!(
1410            group_values.get_indices_by_hash(5).unwrap(),
1411            (vec![4, 5, 6], GroupIndexView::new_non_inlined(1))
1412        );
1413        assert_eq!(group_values.map.len(), 4);
1414
1415        // Emit first 5 to test cases 1~3 again
1416        let _ = group_values.emit(EmitTo::First(5)).unwrap();
1417        assert_eq!(
1418            group_values.get_indices_by_hash(5).unwrap(),
1419            (vec![0, 1], GroupIndexView::new_non_inlined(0))
1420        );
1421        assert_eq!(group_values.map.len(), 1);
1422
1423        // Emit first 1 to test cases 4 again
1424        let _ = group_values.emit(EmitTo::First(1)).unwrap();
1425        assert_eq!(
1426            group_values.get_indices_by_hash(5).unwrap(),
1427            (vec![0], GroupIndexView::new_inlined(0))
1428        );
1429        assert_eq!(group_values.map.len(), 1);
1430
1431        // Emit first 1 to test cases 7
1432        let _ = group_values.emit(EmitTo::First(1)).unwrap();
1433        assert!(group_values.map.is_empty());
1434    }
1435
1436    /// Test data set for [`GroupValuesColumn::vectorized_intern`]
1437    ///
1438    /// Define the test data and support loading them into test [`GroupValuesColumn::vectorized_intern`]
1439    ///
1440    /// The covering situations:
1441    ///
1442    /// Array type:
1443    ///   - Primitive array
1444    ///   - String(byte) array
1445    ///   - String view(byte view) array
1446    ///
1447    /// Repeation and nullability in single batch:
1448    ///   - All not null rows
1449    ///   - Mixed null + not null rows
1450    ///   - All null rows
1451    ///   - All not null rows(repeated)
1452    ///   - Null + not null rows(repeated)
1453    ///   - All not null rows(repeated)
1454    ///
1455    /// If group exists in `map`:
1456    ///   - Group exists in inlined group view
1457    ///   - Group exists in non-inlined group view
1458    ///   - Group not exist + bucket not found in `map`
1459    ///   - Group not exist + not equal to inlined group view(tested in hash collision)
1460    ///   - Group not exist + not equal to non-inlined group view(tested in hash collision)
1461    struct VectorizedTestDataSet {
1462        test_batches: Vec<Vec<ArrayRef>>,
1463        expected_batch: RecordBatch,
1464    }
1465
1466    impl VectorizedTestDataSet {
1467        fn new() -> Self {
1468            // Intern batch 1
1469            let col1 = Int64Array::from(vec![
1470                // Repeated rows in batch
1471                Some(42),   // all not nulls + repeated rows + exist in map case
1472                None,       // mixed + repeated rows + exist in map case
1473                None,       // mixed + repeated rows + not exist in map case
1474                Some(1142), // mixed + repeated rows + not exist in map case
1475                None,       // all nulls + repeated rows + exist in map case
1476                Some(42),
1477                None,
1478                None,
1479                Some(1142),
1480                None,
1481                // Unique rows in batch
1482                Some(4211), // all not nulls + unique rows + exist in map case
1483                None,       // mixed + unique rows + exist in map case
1484                None,       // mixed + unique rows + not exist in map case
1485                Some(4212), // mixed + unique rows + not exist in map case
1486            ]);
1487
1488            let col2 = StringArray::from(vec![
1489                // Repeated rows in batch
1490                Some("string1"), // all not nulls + repeated rows + exist in map case
1491                None,            // mixed + repeated rows + exist in map case
1492                Some("string2"), // mixed + repeated rows + not exist in map case
1493                None,            // mixed + repeated rows + not exist in map case
1494                None,            // all nulls + repeated rows + exist in map case
1495                Some("string1"),
1496                None,
1497                Some("string2"),
1498                None,
1499                None,
1500                // Unique rows in batch
1501                Some("string3"), // all not nulls + unique rows + exist in map case
1502                None,            // mixed + unique rows + exist in map case
1503                Some("string4"), // mixed + unique rows + not exist in map case
1504                None,            // mixed + unique rows + not exist in map case
1505            ]);
1506
1507            let col3 = StringViewArray::from(vec![
1508                // Repeated rows in batch
1509                Some("stringview1"), // all not nulls + repeated rows + exist in map case
1510                Some("stringview2"), // mixed + repeated rows + exist in map case
1511                None,                // mixed + repeated rows + not exist in map case
1512                None,                // mixed + repeated rows + not exist in map case
1513                None,                // all nulls + repeated rows + exist in map case
1514                Some("stringview1"),
1515                Some("stringview2"),
1516                None,
1517                None,
1518                None,
1519                // Unique rows in batch
1520                Some("stringview3"), // all not nulls + unique rows + exist in map case
1521                Some("stringview4"), // mixed + unique rows + exist in map case
1522                None,                // mixed + unique rows + not exist in map case
1523                None,                // mixed + unique rows + not exist in map case
1524            ]);
1525            let batch1 = vec![
1526                Arc::new(col1) as _,
1527                Arc::new(col2) as _,
1528                Arc::new(col3) as _,
1529            ];
1530
1531            // Intern batch 2
1532            let col1 = Int64Array::from(vec![
1533                // Repeated rows in batch
1534                Some(42),    // all not nulls + repeated rows + exist in map case
1535                None,        // mixed + repeated rows + exist in map case
1536                None,        // mixed + repeated rows + not exist in map case
1537                Some(21142), // mixed + repeated rows + not exist in map case
1538                None,        // all nulls + repeated rows + exist in map case
1539                Some(42),
1540                None,
1541                None,
1542                Some(21142),
1543                None,
1544                // Unique rows in batch
1545                Some(4211),  // all not nulls + unique rows + exist in map case
1546                None,        // mixed + unique rows + exist in map case
1547                None,        // mixed + unique rows + not exist in map case
1548                Some(24212), // mixed + unique rows + not exist in map case
1549            ]);
1550
1551            let col2 = StringArray::from(vec![
1552                // Repeated rows in batch
1553                Some("string1"), // all not nulls + repeated rows + exist in map case
1554                None,            // mixed + repeated rows + exist in map case
1555                Some("2string2"), // mixed + repeated rows + not exist in map case
1556                None,            // mixed + repeated rows + not exist in map case
1557                None,            // all nulls + repeated rows + exist in map case
1558                Some("string1"),
1559                None,
1560                Some("2string2"),
1561                None,
1562                None,
1563                // Unique rows in batch
1564                Some("string3"), // all not nulls + unique rows + exist in map case
1565                None,            // mixed + unique rows + exist in map case
1566                Some("2string4"), // mixed + unique rows + not exist in map case
1567                None,            // mixed + unique rows + not exist in map case
1568            ]);
1569
1570            let col3 = StringViewArray::from(vec![
1571                // Repeated rows in batch
1572                Some("stringview1"), // all not nulls + repeated rows + exist in map case
1573                Some("stringview2"), // mixed + repeated rows + exist in map case
1574                None,                // mixed + repeated rows + not exist in map case
1575                None,                // mixed + repeated rows + not exist in map case
1576                None,                // all nulls + repeated rows + exist in map case
1577                Some("stringview1"),
1578                Some("stringview2"),
1579                None,
1580                None,
1581                None,
1582                // Unique rows in batch
1583                Some("stringview3"), // all not nulls + unique rows + exist in map case
1584                Some("stringview4"), // mixed + unique rows + exist in map case
1585                None,                // mixed + unique rows + not exist in map case
1586                None,                // mixed + unique rows + not exist in map case
1587            ]);
1588            let batch2 = vec![
1589                Arc::new(col1) as _,
1590                Arc::new(col2) as _,
1591                Arc::new(col3) as _,
1592            ];
1593
1594            // Intern batch 3
1595            let col1 = Int64Array::from(vec![
1596                // Repeated rows in batch
1597                Some(42),    // all not nulls + repeated rows + exist in map case
1598                None,        // mixed + repeated rows + exist in map case
1599                None,        // mixed + repeated rows + not exist in map case
1600                Some(31142), // mixed + repeated rows + not exist in map case
1601                None,        // all nulls + repeated rows + exist in map case
1602                Some(42),
1603                None,
1604                None,
1605                Some(31142),
1606                None,
1607                // Unique rows in batch
1608                Some(4211),  // all not nulls + unique rows + exist in map case
1609                None,        // mixed + unique rows + exist in map case
1610                None,        // mixed + unique rows + not exist in map case
1611                Some(34212), // mixed + unique rows + not exist in map case
1612            ]);
1613
1614            let col2 = StringArray::from(vec![
1615                // Repeated rows in batch
1616                Some("string1"), // all not nulls + repeated rows + exist in map case
1617                None,            // mixed + repeated rows + exist in map case
1618                Some("3string2"), // mixed + repeated rows + not exist in map case
1619                None,            // mixed + repeated rows + not exist in map case
1620                None,            // all nulls + repeated rows + exist in map case
1621                Some("string1"),
1622                None,
1623                Some("3string2"),
1624                None,
1625                None,
1626                // Unique rows in batch
1627                Some("string3"), // all not nulls + unique rows + exist in map case
1628                None,            // mixed + unique rows + exist in map case
1629                Some("3string4"), // mixed + unique rows + not exist in map case
1630                None,            // mixed + unique rows + not exist in map case
1631            ]);
1632
1633            let col3 = StringViewArray::from(vec![
1634                // Repeated rows in batch
1635                Some("stringview1"), // all not nulls + repeated rows + exist in map case
1636                Some("stringview2"), // mixed + repeated rows + exist in map case
1637                None,                // mixed + repeated rows + not exist in map case
1638                None,                // mixed + repeated rows + not exist in map case
1639                None,                // all nulls + repeated rows + exist in map case
1640                Some("stringview1"),
1641                Some("stringview2"),
1642                None,
1643                None,
1644                None,
1645                // Unique rows in batch
1646                Some("stringview3"), // all not nulls + unique rows + exist in map case
1647                Some("stringview4"), // mixed + unique rows + exist in map case
1648                None,                // mixed + unique rows + not exist in map case
1649                None,                // mixed + unique rows + not exist in map case
1650            ]);
1651            let batch3 = vec![
1652                Arc::new(col1) as _,
1653                Arc::new(col2) as _,
1654                Arc::new(col3) as _,
1655            ];
1656
1657            // Expected batch
1658            let schema = Arc::new(Schema::new(vec![
1659                Field::new("a", DataType::Int64, true),
1660                Field::new("b", DataType::Utf8, true),
1661                Field::new("c", DataType::Utf8View, true),
1662            ]));
1663
1664            let col1 = Int64Array::from(vec![
1665                // Repeated rows in batch
1666                Some(42),
1667                None,
1668                None,
1669                Some(1142),
1670                None,
1671                Some(21142),
1672                None,
1673                Some(31142),
1674                None,
1675                // Unique rows in batch
1676                Some(4211),
1677                None,
1678                None,
1679                Some(4212),
1680                None,
1681                Some(24212),
1682                None,
1683                Some(34212),
1684            ]);
1685
1686            let col2 = StringArray::from(vec![
1687                // Repeated rows in batch
1688                Some("string1"),
1689                None,
1690                Some("string2"),
1691                None,
1692                Some("2string2"),
1693                None,
1694                Some("3string2"),
1695                None,
1696                None,
1697                // Unique rows in batch
1698                Some("string3"),
1699                None,
1700                Some("string4"),
1701                None,
1702                Some("2string4"),
1703                None,
1704                Some("3string4"),
1705                None,
1706            ]);
1707
1708            let col3 = StringViewArray::from(vec![
1709                // Repeated rows in batch
1710                Some("stringview1"),
1711                Some("stringview2"),
1712                None,
1713                None,
1714                None,
1715                None,
1716                None,
1717                None,
1718                None,
1719                // Unique rows in batch
1720                Some("stringview3"),
1721                Some("stringview4"),
1722                None,
1723                None,
1724                None,
1725                None,
1726                None,
1727                None,
1728            ]);
1729            let expected_batch = vec![
1730                Arc::new(col1) as _,
1731                Arc::new(col2) as _,
1732                Arc::new(col3) as _,
1733            ];
1734            let expected_batch = RecordBatch::try_new(schema, expected_batch).unwrap();
1735
1736            Self {
1737                test_batches: vec![batch1, batch2, batch3],
1738                expected_batch,
1739            }
1740        }
1741
1742        fn load_to_group_values(&self, group_values: &mut impl GroupValues) {
1743            for batch in self.test_batches.iter() {
1744                group_values.intern(batch, &mut vec![]).unwrap();
1745            }
1746        }
1747
1748        fn schema(&self) -> SchemaRef {
1749            self.expected_batch.schema()
1750        }
1751    }
1752
1753    fn check_result(actual_batch: &RecordBatch, expected_batch: &RecordBatch) {
1754        let formatted_actual_batch =
1755            pretty_format_batches(std::slice::from_ref(actual_batch))
1756                .unwrap()
1757                .to_string();
1758        let mut formatted_actual_batch_sorted: Vec<&str> =
1759            formatted_actual_batch.trim().lines().collect();
1760        formatted_actual_batch_sorted.sort_unstable();
1761
1762        let formatted_expected_batch =
1763            pretty_format_batches(std::slice::from_ref(expected_batch))
1764                .unwrap()
1765                .to_string();
1766
1767        let mut formatted_expected_batch_sorted: Vec<&str> =
1768            formatted_expected_batch.trim().lines().collect();
1769        formatted_expected_batch_sorted.sort_unstable();
1770
1771        for (i, (actual_line, expected_line)) in formatted_actual_batch_sorted
1772            .iter()
1773            .zip(&formatted_expected_batch_sorted)
1774            .enumerate()
1775        {
1776            assert_eq!(
1777                (i, actual_line),
1778                (i, expected_line),
1779                "Inconsistent result\n\n\
1780                 Actual batch:\n{formatted_actual_batch}\n\
1781                 Expected batch:\n{formatted_expected_batch}\n\
1782                 ",
1783            );
1784        }
1785    }
1786
1787    fn insert_inline_group_index_view(
1788        group_values: &mut GroupValuesColumn<false>,
1789        hash_key: u64,
1790        group_index: u64,
1791    ) {
1792        let group_index_view = GroupIndexView::new_inlined(group_index);
1793        group_values.map.insert_accounted(
1794            (hash_key, group_index_view),
1795            |(hash, _)| *hash,
1796            &mut group_values.map_size,
1797        );
1798    }
1799
1800    fn insert_non_inline_group_index_view(
1801        group_values: &mut GroupValuesColumn<false>,
1802        hash_key: u64,
1803        group_indices: Vec<usize>,
1804    ) {
1805        let list_offset = group_values.group_index_lists.len();
1806        let group_index_view = GroupIndexView::new_non_inlined(list_offset as u64);
1807        group_values.group_index_lists.push(group_indices);
1808        group_values.map.insert_accounted(
1809            (hash_key, group_index_view),
1810            |(hash, _)| *hash,
1811            &mut group_values.map_size,
1812        );
1813    }
1814}