datafusion_physical_plan/aggregates/group_values/multi_group_by/
bytes_view.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::aggregates::group_values::multi_group_by::{
19    nulls_equal_to, GroupColumn, Nulls,
20};
21use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
22use arrow::array::{make_view, Array, ArrayRef, AsArray, ByteView, GenericByteViewArray};
23use arrow::buffer::{Buffer, ScalarBuffer};
24use arrow::datatypes::ByteViewType;
25use datafusion_common::Result;
26use itertools::izip;
27use std::marker::PhantomData;
28use std::mem::{replace, size_of};
29use std::sync::Arc;
30
31const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024;
32
33/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
34///
35/// Stores a collection of binary view or utf8 view group values in a buffer
36/// whose structure is similar to `GenericByteViewArray`, and we can get benefits:
37///
38/// 1. Efficient comparison of incoming rows to existing rows
39/// 2. Efficient construction of the final output array
40/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder`
41pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
42    /// The views of string values
43    ///
44    /// If string len <= 12, the view's format will be:
45    ///   string(12B) | len(4B)
46    ///
47    /// If string len > 12, its format will be:
48    ///     offset(4B) | buffer_index(4B) | prefix(4B) | len(4B)
49    views: Vec<u128>,
50
51    /// The progressing block
52    ///
53    /// New values will be inserted into it until its capacity
54    /// is not enough(detail can see `max_block_size`).
55    in_progress: Vec<u8>,
56
57    /// The completed blocks
58    completed: Vec<Buffer>,
59
60    /// The max size of `in_progress`
61    ///
62    /// `in_progress` will be flushed into `completed`, and create new `in_progress`
63    /// when found its remaining capacity(`max_block_size` - `len(in_progress)`),
64    /// is no enough to store the appended value.
65    ///
66    /// Currently it is fixed at 2MB.
67    max_block_size: usize,
68
69    /// Nulls
70    nulls: MaybeNullBufferBuilder,
71
72    /// phantom data so the type requires `<B>`
73    _phantom: PhantomData<B>,
74}
75
76impl<B: ByteViewType> Default for ByteViewGroupValueBuilder<B> {
77    fn default() -> Self {
78        Self::new()
79    }
80}
81
82impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
83    pub fn new() -> Self {
84        Self {
85            views: Vec::new(),
86            in_progress: Vec::new(),
87            completed: Vec::new(),
88            max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
89            nulls: MaybeNullBufferBuilder::new(),
90            _phantom: PhantomData {},
91        }
92    }
93
94    /// Set the max block size
95    fn with_max_block_size(mut self, max_block_size: usize) -> Self {
96        self.max_block_size = max_block_size;
97        self
98    }
99
100    fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
101        let array = array.as_byte_view::<B>();
102        self.do_equal_to_inner(lhs_row, array, rhs_row)
103    }
104
105    fn append_val_inner(&mut self, array: &ArrayRef, row: usize) {
106        let arr = array.as_byte_view::<B>();
107
108        // Null row case, set and return
109        if arr.is_null(row) {
110            self.nulls.append(true);
111            self.views.push(0);
112            return;
113        }
114
115        // Not null row case
116        self.nulls.append(false);
117        self.do_append_val_inner(arr, row);
118    }
119
120    fn vectorized_equal_to_inner(
121        &self,
122        lhs_rows: &[usize],
123        array: &ArrayRef,
124        rhs_rows: &[usize],
125        equal_to_results: &mut [bool],
126    ) {
127        let array = array.as_byte_view::<B>();
128
129        let iter = izip!(
130            lhs_rows.iter(),
131            rhs_rows.iter(),
132            equal_to_results.iter_mut(),
133        );
134
135        for (&lhs_row, &rhs_row, equal_to_result) in iter {
136            // Has found not equal to, don't need to check
137            if !*equal_to_result {
138                continue;
139            }
140
141            *equal_to_result = self.do_equal_to_inner(lhs_row, array, rhs_row);
142        }
143    }
144
145    fn vectorized_append_inner(&mut self, array: &ArrayRef, rows: &[usize]) {
146        let arr = array.as_byte_view::<B>();
147        let null_count = array.null_count();
148        let num_rows = array.len();
149        let all_null_or_non_null = if null_count == 0 {
150            Nulls::None
151        } else if null_count == num_rows {
152            Nulls::All
153        } else {
154            Nulls::Some
155        };
156
157        match all_null_or_non_null {
158            Nulls::Some => {
159                for &row in rows {
160                    self.append_val_inner(array, row);
161                }
162            }
163
164            Nulls::None => {
165                self.nulls.append_n(rows.len(), false);
166                for &row in rows {
167                    self.do_append_val_inner(arr, row);
168                }
169            }
170
171            Nulls::All => {
172                self.nulls.append_n(rows.len(), true);
173                let new_len = self.views.len() + rows.len();
174                self.views.resize(new_len, 0);
175            }
176        }
177    }
178
179    fn do_append_val_inner(&mut self, array: &GenericByteViewArray<B>, row: usize)
180    where
181        B: ByteViewType,
182    {
183        let value: &[u8] = array.value(row).as_ref();
184
185        let value_len = value.len();
186        let view = if value_len <= 12 {
187            make_view(value, 0, 0)
188        } else {
189            // Ensure big enough block to hold the value firstly
190            self.ensure_in_progress_big_enough(value_len);
191
192            // Append value
193            let buffer_index = self.completed.len();
194            let offset = self.in_progress.len();
195            self.in_progress.extend_from_slice(value);
196
197            make_view(value, buffer_index as u32, offset as u32)
198        };
199
200        // Append view
201        self.views.push(view);
202    }
203
204    fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
205        debug_assert!(value_len > 12);
206        let require_cap = self.in_progress.len() + value_len;
207
208        // If current block isn't big enough, flush it and create a new in progress block
209        if require_cap > self.max_block_size {
210            let flushed_block = replace(
211                &mut self.in_progress,
212                Vec::with_capacity(self.max_block_size),
213            );
214            let buffer = Buffer::from_vec(flushed_block);
215            self.completed.push(buffer);
216        }
217    }
218
219    fn do_equal_to_inner(
220        &self,
221        lhs_row: usize,
222        array: &GenericByteViewArray<B>,
223        rhs_row: usize,
224    ) -> bool {
225        // Check if nulls equal firstly
226        let exist_null = self.nulls.is_null(lhs_row);
227        let input_null = array.is_null(rhs_row);
228        if let Some(result) = nulls_equal_to(exist_null, input_null) {
229            return result;
230        }
231
232        // Otherwise, we need to check their values
233        let exist_view = self.views[lhs_row];
234        let exist_view_len = exist_view as u32;
235
236        let input_view = array.views()[rhs_row];
237        let input_view_len = input_view as u32;
238
239        // The check logic
240        //   - Check len equality
241        //   - If inlined, check inlined value
242        //   - If non-inlined, check prefix and then check value in buffer
243        //     when needed
244        if exist_view_len != input_view_len {
245            return false;
246        }
247
248        if exist_view_len <= 12 {
249            let exist_inline = unsafe {
250                GenericByteViewArray::<B>::inline_value(
251                    &exist_view,
252                    exist_view_len as usize,
253                )
254            };
255            let input_inline = unsafe {
256                GenericByteViewArray::<B>::inline_value(
257                    &input_view,
258                    input_view_len as usize,
259                )
260            };
261            exist_inline == input_inline
262        } else {
263            let exist_prefix =
264                unsafe { GenericByteViewArray::<B>::inline_value(&exist_view, 4) };
265            let input_prefix =
266                unsafe { GenericByteViewArray::<B>::inline_value(&input_view, 4) };
267
268            if exist_prefix != input_prefix {
269                return false;
270            }
271
272            let exist_full = {
273                let byte_view = ByteView::from(exist_view);
274                self.value(
275                    byte_view.buffer_index as usize,
276                    byte_view.offset as usize,
277                    byte_view.length as usize,
278                )
279            };
280            let input_full: &[u8] = unsafe { array.value_unchecked(rhs_row).as_ref() };
281            exist_full == input_full
282        }
283    }
284
285    fn value(&self, buffer_index: usize, offset: usize, length: usize) -> &[u8] {
286        debug_assert!(buffer_index <= self.completed.len());
287
288        if buffer_index < self.completed.len() {
289            let block = &self.completed[buffer_index];
290            &block[offset..offset + length]
291        } else {
292            &self.in_progress[offset..offset + length]
293        }
294    }
295
296    fn build_inner(self) -> ArrayRef {
297        let Self {
298            views,
299            in_progress,
300            mut completed,
301            nulls,
302            ..
303        } = self;
304
305        // Build nulls
306        let null_buffer = nulls.build();
307
308        // Build values
309        // Flush `in_process` firstly
310        if !in_progress.is_empty() {
311            let buffer = Buffer::from(in_progress);
312            completed.push(buffer);
313        }
314
315        let views = ScalarBuffer::from(views);
316
317        // Safety:
318        // * all views were correctly made
319        // * (if utf8): Input was valid Utf8 so buffer contents are
320        // valid utf8 as well
321        unsafe {
322            Arc::new(GenericByteViewArray::<B>::new_unchecked(
323                views,
324                completed,
325                null_buffer,
326            ))
327        }
328    }
329
330    fn take_n_inner(&mut self, n: usize) -> ArrayRef {
331        debug_assert!(self.len() >= n);
332
333        // The `n == len` case, we need to take all
334        if self.len() == n {
335            let new_builder = Self::new().with_max_block_size(self.max_block_size);
336            let cur_builder = replace(self, new_builder);
337            return cur_builder.build_inner();
338        }
339
340        // The `n < len` case
341        // Take n for nulls
342        let null_buffer = self.nulls.take_n(n);
343
344        // Take n for values:
345        //   - Take first n `view`s from `views`
346        //
347        //   - Find the last non-inlined `view`, if all inlined,
348        //     we can build array and return happily, otherwise we
349        //     we need to continue to process related buffers
350        //
351        //   - Get the last related `buffer index`(let's name it `buffer index n`)
352        //     from last non-inlined `view`
353        //
354        //   - Take buffers, the key is that we need to know if we need to take
355        //     the whole last related buffer. The logic is a bit complex, you can
356        //     detail in `take_buffers_with_whole_last`, `take_buffers_with_partial_last`
357        //     and other related steps in following
358        //
359        //   - Shift the `buffer index` of remaining non-inlined `views`
360        //
361        let first_n_views = self.views.drain(0..n).collect::<Vec<_>>();
362
363        let last_non_inlined_view = first_n_views
364            .iter()
365            .rev()
366            .find(|view| ((**view) as u32) > 12);
367
368        // All taken views inlined
369        let Some(view) = last_non_inlined_view else {
370            let views = ScalarBuffer::from(first_n_views);
371
372            // Safety:
373            // * all views were correctly made
374            // * (if utf8): Input was valid Utf8 so buffer contents are
375            // valid utf8 as well
376            unsafe {
377                return Arc::new(GenericByteViewArray::<B>::new_unchecked(
378                    views,
379                    Vec::new(),
380                    null_buffer,
381                ));
382            }
383        };
384
385        // Unfortunately, some taken views non-inlined
386        let view = ByteView::from(*view);
387        let last_remaining_buffer_index = view.buffer_index as usize;
388
389        // Check should we take the whole `last_remaining_buffer_index` buffer
390        let take_whole_last_buffer = self.should_take_whole_buffer(
391            last_remaining_buffer_index,
392            (view.offset + view.length) as usize,
393        );
394
395        // Take related buffers
396        let buffers = if take_whole_last_buffer {
397            self.take_buffers_with_whole_last(last_remaining_buffer_index)
398        } else {
399            self.take_buffers_with_partial_last(
400                last_remaining_buffer_index,
401                (view.offset + view.length) as usize,
402            )
403        };
404
405        // Shift `buffer index`s finally
406        let shifts = if take_whole_last_buffer {
407            last_remaining_buffer_index + 1
408        } else {
409            last_remaining_buffer_index
410        };
411
412        self.views.iter_mut().for_each(|view| {
413            if (*view as u32) > 12 {
414                let mut byte_view = ByteView::from(*view);
415                byte_view.buffer_index -= shifts as u32;
416                *view = byte_view.as_u128();
417            }
418        });
419
420        // Build array and return
421        let views = ScalarBuffer::from(first_n_views);
422
423        // Safety:
424        // * all views were correctly made
425        // * (if utf8): Input was valid Utf8 so buffer contents are
426        // valid utf8 as well
427        unsafe {
428            Arc::new(GenericByteViewArray::<B>::new_unchecked(
429                views,
430                buffers,
431                null_buffer,
432            ))
433        }
434    }
435
436    fn take_buffers_with_whole_last(
437        &mut self,
438        last_remaining_buffer_index: usize,
439    ) -> Vec<Buffer> {
440        if last_remaining_buffer_index == self.completed.len() {
441            self.flush_in_progress();
442        }
443        self.completed
444            .drain(0..last_remaining_buffer_index + 1)
445            .collect()
446    }
447
448    fn take_buffers_with_partial_last(
449        &mut self,
450        last_remaining_buffer_index: usize,
451        last_take_len: usize,
452    ) -> Vec<Buffer> {
453        let mut take_buffers = Vec::with_capacity(last_remaining_buffer_index + 1);
454
455        // Take `0 ~ last_remaining_buffer_index - 1` buffers
456        if !self.completed.is_empty() || last_remaining_buffer_index == 0 {
457            take_buffers.extend(self.completed.drain(0..last_remaining_buffer_index));
458        }
459
460        // Process the `last_remaining_buffer_index` buffers
461        let last_buffer = if last_remaining_buffer_index < self.completed.len() {
462            // If it is in `completed`, simply clone
463            self.completed[last_remaining_buffer_index].clone()
464        } else {
465            // If it is `in_progress`, copied `0 ~ offset` part
466            let taken_last_buffer = self.in_progress[0..last_take_len].to_vec();
467            Buffer::from_vec(taken_last_buffer)
468        };
469        take_buffers.push(last_buffer);
470
471        take_buffers
472    }
473
474    #[inline]
475    fn should_take_whole_buffer(&self, buffer_index: usize, take_len: usize) -> bool {
476        if buffer_index < self.completed.len() {
477            take_len == self.completed[buffer_index].len()
478        } else {
479            take_len == self.in_progress.len()
480        }
481    }
482
483    fn flush_in_progress(&mut self) {
484        let flushed_block = replace(
485            &mut self.in_progress,
486            Vec::with_capacity(self.max_block_size),
487        );
488        let buffer = Buffer::from_vec(flushed_block);
489        self.completed.push(buffer);
490    }
491}
492
493impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
494    fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
495        self.equal_to_inner(lhs_row, array, rhs_row)
496    }
497
498    fn append_val(&mut self, array: &ArrayRef, row: usize) -> Result<()> {
499        self.append_val_inner(array, row);
500        Ok(())
501    }
502
503    fn vectorized_equal_to(
504        &self,
505        group_indices: &[usize],
506        array: &ArrayRef,
507        rows: &[usize],
508        equal_to_results: &mut [bool],
509    ) {
510        self.vectorized_equal_to_inner(group_indices, array, rows, equal_to_results);
511    }
512
513    fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()> {
514        self.vectorized_append_inner(array, rows);
515        Ok(())
516    }
517
518    fn len(&self) -> usize {
519        self.views.len()
520    }
521
522    fn size(&self) -> usize {
523        let buffers_size = self
524            .completed
525            .iter()
526            .map(|buf| buf.capacity() * size_of::<u8>())
527            .sum::<usize>();
528
529        self.nulls.allocated_size()
530            + self.views.capacity() * size_of::<u128>()
531            + self.in_progress.capacity() * size_of::<u8>()
532            + buffers_size
533            + size_of::<Self>()
534    }
535
536    fn build(self: Box<Self>) -> ArrayRef {
537        Self::build_inner(*self)
538    }
539
540    fn take_n(&mut self, n: usize) -> ArrayRef {
541        self.take_n_inner(n)
542    }
543}
544
545#[cfg(test)]
546mod tests {
547    use std::sync::Arc;
548
549    use crate::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder;
550    use arrow::array::{ArrayRef, AsArray, NullBufferBuilder, StringViewArray};
551    use arrow::datatypes::StringViewType;
552
553    use super::GroupColumn;
554
555    #[test]
556    fn test_byte_view_append_val() {
557        let mut builder =
558            ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
559        let builder_array = StringViewArray::from(vec![
560            Some("this string is quite long"), // in buffer 0
561            Some("foo"),
562            None,
563            Some("bar"),
564            Some("this string is also quite long"), // buffer 0
565            Some("this string is quite long"),      // buffer 1
566            Some("bar"),
567        ]);
568        let builder_array: ArrayRef = Arc::new(builder_array);
569        for row in 0..builder_array.len() {
570            builder.append_val(&builder_array, row).unwrap();
571        }
572
573        let output = Box::new(builder).build();
574        // should be 2 output buffers to hold all the data
575        assert_eq!(output.as_string_view().data_buffers().len(), 2);
576        assert_eq!(&output, &builder_array)
577    }
578
579    #[test]
580    fn test_byte_view_equal_to() {
581        let append = |builder: &mut ByteViewGroupValueBuilder<StringViewType>,
582                      builder_array: &ArrayRef,
583                      append_rows: &[usize]| {
584            for &index in append_rows {
585                builder.append_val(builder_array, index).unwrap();
586            }
587        };
588
589        let equal_to = |builder: &ByteViewGroupValueBuilder<StringViewType>,
590                        lhs_rows: &[usize],
591                        input_array: &ArrayRef,
592                        rhs_rows: &[usize],
593                        equal_to_results: &mut Vec<bool>| {
594            let iter = lhs_rows.iter().zip(rhs_rows.iter());
595            for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() {
596                equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row);
597            }
598        };
599
600        test_byte_view_equal_to_internal(append, equal_to);
601    }
602
603    #[test]
604    fn test_byte_view_vectorized_equal_to() {
605        let append = |builder: &mut ByteViewGroupValueBuilder<StringViewType>,
606                      builder_array: &ArrayRef,
607                      append_rows: &[usize]| {
608            builder
609                .vectorized_append(builder_array, append_rows)
610                .unwrap();
611        };
612
613        let equal_to = |builder: &ByteViewGroupValueBuilder<StringViewType>,
614                        lhs_rows: &[usize],
615                        input_array: &ArrayRef,
616                        rhs_rows: &[usize],
617                        equal_to_results: &mut Vec<bool>| {
618            builder.vectorized_equal_to(
619                lhs_rows,
620                input_array,
621                rhs_rows,
622                equal_to_results,
623            );
624        };
625
626        test_byte_view_equal_to_internal(append, equal_to);
627    }
628
629    #[test]
630    fn test_byte_view_vectorized_operation_special_case() {
631        // Test the special `all nulls` or `not nulls` input array case
632        // for vectorized append and equal to
633
634        let mut builder =
635            ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
636
637        // All nulls input array
638        let all_nulls_input_array = Arc::new(StringViewArray::from(vec![
639            Option::<&str>::None,
640            None,
641            None,
642            None,
643            None,
644        ])) as _;
645        builder
646            .vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4])
647            .unwrap();
648
649        let mut equal_to_results = vec![true; all_nulls_input_array.len()];
650        builder.vectorized_equal_to(
651            &[0, 1, 2, 3, 4],
652            &all_nulls_input_array,
653            &[0, 1, 2, 3, 4],
654            &mut equal_to_results,
655        );
656
657        assert!(equal_to_results[0]);
658        assert!(equal_to_results[1]);
659        assert!(equal_to_results[2]);
660        assert!(equal_to_results[3]);
661        assert!(equal_to_results[4]);
662
663        // All not nulls input array
664        let all_not_nulls_input_array = Arc::new(StringViewArray::from(vec![
665            Some("stringview1"),
666            Some("stringview2"),
667            Some("stringview3"),
668            Some("stringview4"),
669            Some("stringview5"),
670        ])) as _;
671        builder
672            .vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4])
673            .unwrap();
674
675        let mut equal_to_results = vec![true; all_not_nulls_input_array.len()];
676        builder.vectorized_equal_to(
677            &[5, 6, 7, 8, 9],
678            &all_not_nulls_input_array,
679            &[0, 1, 2, 3, 4],
680            &mut equal_to_results,
681        );
682
683        assert!(equal_to_results[0]);
684        assert!(equal_to_results[1]);
685        assert!(equal_to_results[2]);
686        assert!(equal_to_results[3]);
687        assert!(equal_to_results[4]);
688    }
689
690    fn test_byte_view_equal_to_internal<A, E>(mut append: A, mut equal_to: E)
691    where
692        A: FnMut(&mut ByteViewGroupValueBuilder<StringViewType>, &ArrayRef, &[usize]),
693        E: FnMut(
694            &ByteViewGroupValueBuilder<StringViewType>,
695            &[usize],
696            &ArrayRef,
697            &[usize],
698            &mut Vec<bool>,
699        ),
700    {
701        // Will cover such cases:
702        //   - exist null, input not null
703        //   - exist null, input null; values not equal
704        //   - exist null, input null; values equal
705        //   - exist not null, input null
706        //   - exist not null, input not null; value lens not equal
707        //   - exist not null, input not null; value not equal(inlined case)
708        //   - exist not null, input not null; value equal(inlined case)
709        //
710        //   - exist not null, input not null; value not equal
711        //     (non-inlined case + prefix not equal)
712        //
713        //   - exist not null, input not null; value not equal
714        //     (non-inlined case + value in `completed`)
715        //
716        //   - exist not null, input not null; value equal
717        //     (non-inlined case + value in `completed`)
718        //
719        //   - exist not null, input not null; value not equal
720        //     (non-inlined case + value in `in_progress`)
721        //
722        //   - exist not null, input not null; value equal
723        //     (non-inlined case + value in `in_progress`)
724
725        // Set the block size to 40 for ensuring some unlined values are in `in_progress`,
726        // and some are in `completed`, so both two branches in `value` function can be covered.
727        let mut builder =
728            ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
729        let builder_array = Arc::new(StringViewArray::from(vec![
730            None,
731            None,
732            None,
733            Some("foo"),
734            Some("bazz"),
735            Some("foo"),
736            Some("bar"),
737            Some("I am a long string for test eq in completed"),
738            Some("I am a long string for test eq in progress"),
739        ])) as ArrayRef;
740        append(&mut builder, &builder_array, &[0, 1, 2, 3, 4, 5, 6, 7, 8]);
741
742        // Define input array
743        let (views, buffer, _nulls) = StringViewArray::from(vec![
744            Some("foo"),
745            Some("bar"), // set to null
746            None,
747            None,
748            Some("baz"),
749            Some("oof"),
750            Some("bar"),
751            Some("i am a long string for test eq in completed"),
752            Some("I am a long string for test eq in COMPLETED"),
753            Some("I am a long string for test eq in completed"),
754            Some("I am a long string for test eq in PROGRESS"),
755            Some("I am a long string for test eq in progress"),
756        ])
757        .into_parts();
758
759        // explicitly build a boolean buffer where one of the null values also happens to match
760        let mut nulls = NullBufferBuilder::new(9);
761        nulls.append_non_null();
762        nulls.append_null(); // this sets Some("bar") to null above
763        nulls.append_null();
764        nulls.append_null();
765        nulls.append_non_null();
766        nulls.append_non_null();
767        nulls.append_non_null();
768        nulls.append_non_null();
769        nulls.append_non_null();
770        nulls.append_non_null();
771        nulls.append_non_null();
772        nulls.append_non_null();
773        let input_array =
774            Arc::new(StringViewArray::new(views, buffer, nulls.finish())) as ArrayRef;
775
776        // Check
777        let mut equal_to_results = vec![true; input_array.len()];
778        equal_to(
779            &builder,
780            &[0, 1, 2, 3, 4, 5, 6, 7, 7, 7, 8, 8],
781            &input_array,
782            &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
783            &mut equal_to_results,
784        );
785
786        assert!(!equal_to_results[0]);
787        assert!(equal_to_results[1]);
788        assert!(equal_to_results[2]);
789        assert!(!equal_to_results[3]);
790        assert!(!equal_to_results[4]);
791        assert!(!equal_to_results[5]);
792        assert!(equal_to_results[6]);
793        assert!(!equal_to_results[7]);
794        assert!(!equal_to_results[8]);
795        assert!(equal_to_results[9]);
796        assert!(!equal_to_results[10]);
797        assert!(equal_to_results[11]);
798    }
799
800    #[test]
801    fn test_byte_view_take_n() {
802        // ####### Define cases and init #######
803
804        // `take_n` is really complex, we should consider and test following situations:
805        //   1. Take nulls
806        //   2. Take all `inlined`s
807        //   3. Take non-inlined + partial last buffer in `completed`
808        //   4. Take non-inlined + whole last buffer in `completed`
809        //   5. Take non-inlined + partial last `in_progress`
810        //   6. Take non-inlined + whole last buffer in `in_progress`
811        //   7. Take all views at once
812
813        let mut builder =
814            ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
815        let input_array = StringViewArray::from(vec![
816            //  Test situation 1
817            None,
818            None,
819            // Test situation 2 (also test take null together)
820            None,
821            Some("foo"),
822            Some("bar"),
823            // Test situation 3 (also test take null + inlined)
824            None,
825            Some("foo"),
826            Some("this string is quite long"),
827            Some("this string is also quite long"),
828            // Test situation 4 (also test take null + inlined)
829            None,
830            Some("bar"),
831            Some("this string is quite long"),
832            // Test situation 5 (also test take null + inlined)
833            None,
834            Some("foo"),
835            Some("another string that is is quite long"),
836            Some("this string not so long"),
837            // Test situation 6 (also test take null + inlined + insert again after taking)
838            None,
839            Some("bar"),
840            Some("this string is quite long"),
841            // Insert 4 and just take 3 to ensure it will go the path of situation 6
842            None,
843            // Finally, we create a new builder,  insert the whole array and then
844            // take whole at once for testing situation 7
845        ]);
846
847        let input_array: ArrayRef = Arc::new(input_array);
848        let first_ones_to_append = 16; // For testing situation 1~5
849        let second_ones_to_append = 4; // For testing situation 6
850        let final_ones_to_append = input_array.len(); // For testing situation 7
851
852        // ####### Test situation 1~5 #######
853        for row in 0..first_ones_to_append {
854            builder.append_val(&input_array, row).unwrap();
855        }
856
857        assert_eq!(builder.completed.len(), 2);
858        assert_eq!(builder.in_progress.len(), 59);
859
860        // Situation 1
861        let taken_array = builder.take_n(2);
862        assert_eq!(&taken_array, &input_array.slice(0, 2));
863
864        // Situation 2
865        let taken_array = builder.take_n(3);
866        assert_eq!(&taken_array, &input_array.slice(2, 3));
867
868        // Situation 3
869        let taken_array = builder.take_n(3);
870        assert_eq!(&taken_array, &input_array.slice(5, 3));
871
872        let taken_array = builder.take_n(1);
873        assert_eq!(&taken_array, &input_array.slice(8, 1));
874
875        // Situation 4
876        let taken_array = builder.take_n(3);
877        assert_eq!(&taken_array, &input_array.slice(9, 3));
878
879        // Situation 5
880        let taken_array = builder.take_n(3);
881        assert_eq!(&taken_array, &input_array.slice(12, 3));
882
883        let taken_array = builder.take_n(1);
884        assert_eq!(&taken_array, &input_array.slice(15, 1));
885
886        // ####### Test situation 6 #######
887        assert!(builder.completed.is_empty());
888        assert!(builder.in_progress.is_empty());
889        assert!(builder.views.is_empty());
890
891        for row in first_ones_to_append..first_ones_to_append + second_ones_to_append {
892            builder.append_val(&input_array, row).unwrap();
893        }
894
895        assert!(builder.completed.is_empty());
896        assert_eq!(builder.in_progress.len(), 25);
897
898        let taken_array = builder.take_n(3);
899        assert_eq!(&taken_array, &input_array.slice(16, 3));
900
901        // ####### Test situation 7 #######
902        // Create a new builder
903        let mut builder =
904            ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
905
906        for row in 0..final_ones_to_append {
907            builder.append_val(&input_array, row).unwrap();
908        }
909
910        assert_eq!(builder.completed.len(), 3);
911        assert_eq!(builder.in_progress.len(), 25);
912
913        let taken_array = builder.take_n(final_ones_to_append);
914        assert_eq!(&taken_array, &input_array);
915    }
916}