1use crate::aggregates::group_values::multi_group_by::{
19 nulls_equal_to, GroupColumn, Nulls,
20};
21use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
22use arrow::array::{make_view, Array, ArrayRef, AsArray, ByteView, GenericByteViewArray};
23use arrow::buffer::{Buffer, ScalarBuffer};
24use arrow::datatypes::ByteViewType;
25use datafusion_common::Result;
26use itertools::izip;
27use std::marker::PhantomData;
28use std::mem::{replace, size_of};
29use std::sync::Arc;
30
31const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024;
32
33pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
42 views: Vec<u128>,
50
51 in_progress: Vec<u8>,
56
57 completed: Vec<Buffer>,
59
60 max_block_size: usize,
68
69 nulls: MaybeNullBufferBuilder,
71
72 _phantom: PhantomData<B>,
74}
75
76impl<B: ByteViewType> Default for ByteViewGroupValueBuilder<B> {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
83 pub fn new() -> Self {
84 Self {
85 views: Vec::new(),
86 in_progress: Vec::new(),
87 completed: Vec::new(),
88 max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
89 nulls: MaybeNullBufferBuilder::new(),
90 _phantom: PhantomData {},
91 }
92 }
93
94 fn with_max_block_size(mut self, max_block_size: usize) -> Self {
96 self.max_block_size = max_block_size;
97 self
98 }
99
100 fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
101 let array = array.as_byte_view::<B>();
102 self.do_equal_to_inner(lhs_row, array, rhs_row)
103 }
104
105 fn append_val_inner(&mut self, array: &ArrayRef, row: usize) {
106 let arr = array.as_byte_view::<B>();
107
108 if arr.is_null(row) {
110 self.nulls.append(true);
111 self.views.push(0);
112 return;
113 }
114
115 self.nulls.append(false);
117 self.do_append_val_inner(arr, row);
118 }
119
120 fn vectorized_equal_to_inner(
121 &self,
122 lhs_rows: &[usize],
123 array: &ArrayRef,
124 rhs_rows: &[usize],
125 equal_to_results: &mut [bool],
126 ) {
127 let array = array.as_byte_view::<B>();
128
129 let iter = izip!(
130 lhs_rows.iter(),
131 rhs_rows.iter(),
132 equal_to_results.iter_mut(),
133 );
134
135 for (&lhs_row, &rhs_row, equal_to_result) in iter {
136 if !*equal_to_result {
138 continue;
139 }
140
141 *equal_to_result = self.do_equal_to_inner(lhs_row, array, rhs_row);
142 }
143 }
144
145 fn vectorized_append_inner(&mut self, array: &ArrayRef, rows: &[usize]) {
146 let arr = array.as_byte_view::<B>();
147 let null_count = array.null_count();
148 let num_rows = array.len();
149 let all_null_or_non_null = if null_count == 0 {
150 Nulls::None
151 } else if null_count == num_rows {
152 Nulls::All
153 } else {
154 Nulls::Some
155 };
156
157 match all_null_or_non_null {
158 Nulls::Some => {
159 for &row in rows {
160 self.append_val_inner(array, row);
161 }
162 }
163
164 Nulls::None => {
165 self.nulls.append_n(rows.len(), false);
166 for &row in rows {
167 self.do_append_val_inner(arr, row);
168 }
169 }
170
171 Nulls::All => {
172 self.nulls.append_n(rows.len(), true);
173 let new_len = self.views.len() + rows.len();
174 self.views.resize(new_len, 0);
175 }
176 }
177 }
178
179 fn do_append_val_inner(&mut self, array: &GenericByteViewArray<B>, row: usize)
180 where
181 B: ByteViewType,
182 {
183 let value: &[u8] = array.value(row).as_ref();
184
185 let value_len = value.len();
186 let view = if value_len <= 12 {
187 make_view(value, 0, 0)
188 } else {
189 self.ensure_in_progress_big_enough(value_len);
191
192 let buffer_index = self.completed.len();
194 let offset = self.in_progress.len();
195 self.in_progress.extend_from_slice(value);
196
197 make_view(value, buffer_index as u32, offset as u32)
198 };
199
200 self.views.push(view);
202 }
203
204 fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
205 debug_assert!(value_len > 12);
206 let require_cap = self.in_progress.len() + value_len;
207
208 if require_cap > self.max_block_size {
210 let flushed_block = replace(
211 &mut self.in_progress,
212 Vec::with_capacity(self.max_block_size),
213 );
214 let buffer = Buffer::from_vec(flushed_block);
215 self.completed.push(buffer);
216 }
217 }
218
219 fn do_equal_to_inner(
220 &self,
221 lhs_row: usize,
222 array: &GenericByteViewArray<B>,
223 rhs_row: usize,
224 ) -> bool {
225 let exist_null = self.nulls.is_null(lhs_row);
227 let input_null = array.is_null(rhs_row);
228 if let Some(result) = nulls_equal_to(exist_null, input_null) {
229 return result;
230 }
231
232 let exist_view = self.views[lhs_row];
234 let exist_view_len = exist_view as u32;
235
236 let input_view = array.views()[rhs_row];
237 let input_view_len = input_view as u32;
238
239 if exist_view_len != input_view_len {
245 return false;
246 }
247
248 if exist_view_len <= 12 {
249 let exist_inline = unsafe {
250 GenericByteViewArray::<B>::inline_value(
251 &exist_view,
252 exist_view_len as usize,
253 )
254 };
255 let input_inline = unsafe {
256 GenericByteViewArray::<B>::inline_value(
257 &input_view,
258 input_view_len as usize,
259 )
260 };
261 exist_inline == input_inline
262 } else {
263 let exist_prefix =
264 unsafe { GenericByteViewArray::<B>::inline_value(&exist_view, 4) };
265 let input_prefix =
266 unsafe { GenericByteViewArray::<B>::inline_value(&input_view, 4) };
267
268 if exist_prefix != input_prefix {
269 return false;
270 }
271
272 let exist_full = {
273 let byte_view = ByteView::from(exist_view);
274 self.value(
275 byte_view.buffer_index as usize,
276 byte_view.offset as usize,
277 byte_view.length as usize,
278 )
279 };
280 let input_full: &[u8] = unsafe { array.value_unchecked(rhs_row).as_ref() };
281 exist_full == input_full
282 }
283 }
284
285 fn value(&self, buffer_index: usize, offset: usize, length: usize) -> &[u8] {
286 debug_assert!(buffer_index <= self.completed.len());
287
288 if buffer_index < self.completed.len() {
289 let block = &self.completed[buffer_index];
290 &block[offset..offset + length]
291 } else {
292 &self.in_progress[offset..offset + length]
293 }
294 }
295
296 fn build_inner(self) -> ArrayRef {
297 let Self {
298 views,
299 in_progress,
300 mut completed,
301 nulls,
302 ..
303 } = self;
304
305 let null_buffer = nulls.build();
307
308 if !in_progress.is_empty() {
311 let buffer = Buffer::from(in_progress);
312 completed.push(buffer);
313 }
314
315 let views = ScalarBuffer::from(views);
316
317 unsafe {
322 Arc::new(GenericByteViewArray::<B>::new_unchecked(
323 views,
324 completed,
325 null_buffer,
326 ))
327 }
328 }
329
330 fn take_n_inner(&mut self, n: usize) -> ArrayRef {
331 debug_assert!(self.len() >= n);
332
333 if self.len() == n {
335 let new_builder = Self::new().with_max_block_size(self.max_block_size);
336 let cur_builder = replace(self, new_builder);
337 return cur_builder.build_inner();
338 }
339
340 let null_buffer = self.nulls.take_n(n);
343
344 let first_n_views = self.views.drain(0..n).collect::<Vec<_>>();
362
363 let last_non_inlined_view = first_n_views
364 .iter()
365 .rev()
366 .find(|view| ((**view) as u32) > 12);
367
368 let Some(view) = last_non_inlined_view else {
370 let views = ScalarBuffer::from(first_n_views);
371
372 unsafe {
377 return Arc::new(GenericByteViewArray::<B>::new_unchecked(
378 views,
379 Vec::new(),
380 null_buffer,
381 ));
382 }
383 };
384
385 let view = ByteView::from(*view);
387 let last_remaining_buffer_index = view.buffer_index as usize;
388
389 let take_whole_last_buffer = self.should_take_whole_buffer(
391 last_remaining_buffer_index,
392 (view.offset + view.length) as usize,
393 );
394
395 let buffers = if take_whole_last_buffer {
397 self.take_buffers_with_whole_last(last_remaining_buffer_index)
398 } else {
399 self.take_buffers_with_partial_last(
400 last_remaining_buffer_index,
401 (view.offset + view.length) as usize,
402 )
403 };
404
405 let shifts = if take_whole_last_buffer {
407 last_remaining_buffer_index + 1
408 } else {
409 last_remaining_buffer_index
410 };
411
412 self.views.iter_mut().for_each(|view| {
413 if (*view as u32) > 12 {
414 let mut byte_view = ByteView::from(*view);
415 byte_view.buffer_index -= shifts as u32;
416 *view = byte_view.as_u128();
417 }
418 });
419
420 let views = ScalarBuffer::from(first_n_views);
422
423 unsafe {
428 Arc::new(GenericByteViewArray::<B>::new_unchecked(
429 views,
430 buffers,
431 null_buffer,
432 ))
433 }
434 }
435
436 fn take_buffers_with_whole_last(
437 &mut self,
438 last_remaining_buffer_index: usize,
439 ) -> Vec<Buffer> {
440 if last_remaining_buffer_index == self.completed.len() {
441 self.flush_in_progress();
442 }
443 self.completed
444 .drain(0..last_remaining_buffer_index + 1)
445 .collect()
446 }
447
448 fn take_buffers_with_partial_last(
449 &mut self,
450 last_remaining_buffer_index: usize,
451 last_take_len: usize,
452 ) -> Vec<Buffer> {
453 let mut take_buffers = Vec::with_capacity(last_remaining_buffer_index + 1);
454
455 if !self.completed.is_empty() || last_remaining_buffer_index == 0 {
457 take_buffers.extend(self.completed.drain(0..last_remaining_buffer_index));
458 }
459
460 let last_buffer = if last_remaining_buffer_index < self.completed.len() {
462 self.completed[last_remaining_buffer_index].clone()
464 } else {
465 let taken_last_buffer = self.in_progress[0..last_take_len].to_vec();
467 Buffer::from_vec(taken_last_buffer)
468 };
469 take_buffers.push(last_buffer);
470
471 take_buffers
472 }
473
474 #[inline]
475 fn should_take_whole_buffer(&self, buffer_index: usize, take_len: usize) -> bool {
476 if buffer_index < self.completed.len() {
477 take_len == self.completed[buffer_index].len()
478 } else {
479 take_len == self.in_progress.len()
480 }
481 }
482
483 fn flush_in_progress(&mut self) {
484 let flushed_block = replace(
485 &mut self.in_progress,
486 Vec::with_capacity(self.max_block_size),
487 );
488 let buffer = Buffer::from_vec(flushed_block);
489 self.completed.push(buffer);
490 }
491}
492
493impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
494 fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
495 self.equal_to_inner(lhs_row, array, rhs_row)
496 }
497
498 fn append_val(&mut self, array: &ArrayRef, row: usize) -> Result<()> {
499 self.append_val_inner(array, row);
500 Ok(())
501 }
502
503 fn vectorized_equal_to(
504 &self,
505 group_indices: &[usize],
506 array: &ArrayRef,
507 rows: &[usize],
508 equal_to_results: &mut [bool],
509 ) {
510 self.vectorized_equal_to_inner(group_indices, array, rows, equal_to_results);
511 }
512
513 fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()> {
514 self.vectorized_append_inner(array, rows);
515 Ok(())
516 }
517
518 fn len(&self) -> usize {
519 self.views.len()
520 }
521
522 fn size(&self) -> usize {
523 let buffers_size = self
524 .completed
525 .iter()
526 .map(|buf| buf.capacity() * size_of::<u8>())
527 .sum::<usize>();
528
529 self.nulls.allocated_size()
530 + self.views.capacity() * size_of::<u128>()
531 + self.in_progress.capacity() * size_of::<u8>()
532 + buffers_size
533 + size_of::<Self>()
534 }
535
536 fn build(self: Box<Self>) -> ArrayRef {
537 Self::build_inner(*self)
538 }
539
540 fn take_n(&mut self, n: usize) -> ArrayRef {
541 self.take_n_inner(n)
542 }
543}
544
545#[cfg(test)]
546mod tests {
547 use std::sync::Arc;
548
549 use crate::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder;
550 use arrow::array::{ArrayRef, AsArray, NullBufferBuilder, StringViewArray};
551 use arrow::datatypes::StringViewType;
552
553 use super::GroupColumn;
554
555 #[test]
556 fn test_byte_view_append_val() {
557 let mut builder =
558 ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
559 let builder_array = StringViewArray::from(vec![
560 Some("this string is quite long"), Some("foo"),
562 None,
563 Some("bar"),
564 Some("this string is also quite long"), Some("this string is quite long"), Some("bar"),
567 ]);
568 let builder_array: ArrayRef = Arc::new(builder_array);
569 for row in 0..builder_array.len() {
570 builder.append_val(&builder_array, row).unwrap();
571 }
572
573 let output = Box::new(builder).build();
574 assert_eq!(output.as_string_view().data_buffers().len(), 2);
576 assert_eq!(&output, &builder_array)
577 }
578
579 #[test]
580 fn test_byte_view_equal_to() {
581 let append = |builder: &mut ByteViewGroupValueBuilder<StringViewType>,
582 builder_array: &ArrayRef,
583 append_rows: &[usize]| {
584 for &index in append_rows {
585 builder.append_val(builder_array, index).unwrap();
586 }
587 };
588
589 let equal_to = |builder: &ByteViewGroupValueBuilder<StringViewType>,
590 lhs_rows: &[usize],
591 input_array: &ArrayRef,
592 rhs_rows: &[usize],
593 equal_to_results: &mut Vec<bool>| {
594 let iter = lhs_rows.iter().zip(rhs_rows.iter());
595 for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() {
596 equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row);
597 }
598 };
599
600 test_byte_view_equal_to_internal(append, equal_to);
601 }
602
603 #[test]
604 fn test_byte_view_vectorized_equal_to() {
605 let append = |builder: &mut ByteViewGroupValueBuilder<StringViewType>,
606 builder_array: &ArrayRef,
607 append_rows: &[usize]| {
608 builder
609 .vectorized_append(builder_array, append_rows)
610 .unwrap();
611 };
612
613 let equal_to = |builder: &ByteViewGroupValueBuilder<StringViewType>,
614 lhs_rows: &[usize],
615 input_array: &ArrayRef,
616 rhs_rows: &[usize],
617 equal_to_results: &mut Vec<bool>| {
618 builder.vectorized_equal_to(
619 lhs_rows,
620 input_array,
621 rhs_rows,
622 equal_to_results,
623 );
624 };
625
626 test_byte_view_equal_to_internal(append, equal_to);
627 }
628
629 #[test]
630 fn test_byte_view_vectorized_operation_special_case() {
631 let mut builder =
635 ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
636
637 let all_nulls_input_array = Arc::new(StringViewArray::from(vec![
639 Option::<&str>::None,
640 None,
641 None,
642 None,
643 None,
644 ])) as _;
645 builder
646 .vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4])
647 .unwrap();
648
649 let mut equal_to_results = vec![true; all_nulls_input_array.len()];
650 builder.vectorized_equal_to(
651 &[0, 1, 2, 3, 4],
652 &all_nulls_input_array,
653 &[0, 1, 2, 3, 4],
654 &mut equal_to_results,
655 );
656
657 assert!(equal_to_results[0]);
658 assert!(equal_to_results[1]);
659 assert!(equal_to_results[2]);
660 assert!(equal_to_results[3]);
661 assert!(equal_to_results[4]);
662
663 let all_not_nulls_input_array = Arc::new(StringViewArray::from(vec![
665 Some("stringview1"),
666 Some("stringview2"),
667 Some("stringview3"),
668 Some("stringview4"),
669 Some("stringview5"),
670 ])) as _;
671 builder
672 .vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4])
673 .unwrap();
674
675 let mut equal_to_results = vec![true; all_not_nulls_input_array.len()];
676 builder.vectorized_equal_to(
677 &[5, 6, 7, 8, 9],
678 &all_not_nulls_input_array,
679 &[0, 1, 2, 3, 4],
680 &mut equal_to_results,
681 );
682
683 assert!(equal_to_results[0]);
684 assert!(equal_to_results[1]);
685 assert!(equal_to_results[2]);
686 assert!(equal_to_results[3]);
687 assert!(equal_to_results[4]);
688 }
689
690 fn test_byte_view_equal_to_internal<A, E>(mut append: A, mut equal_to: E)
691 where
692 A: FnMut(&mut ByteViewGroupValueBuilder<StringViewType>, &ArrayRef, &[usize]),
693 E: FnMut(
694 &ByteViewGroupValueBuilder<StringViewType>,
695 &[usize],
696 &ArrayRef,
697 &[usize],
698 &mut Vec<bool>,
699 ),
700 {
701 let mut builder =
728 ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
729 let builder_array = Arc::new(StringViewArray::from(vec![
730 None,
731 None,
732 None,
733 Some("foo"),
734 Some("bazz"),
735 Some("foo"),
736 Some("bar"),
737 Some("I am a long string for test eq in completed"),
738 Some("I am a long string for test eq in progress"),
739 ])) as ArrayRef;
740 append(&mut builder, &builder_array, &[0, 1, 2, 3, 4, 5, 6, 7, 8]);
741
742 let (views, buffer, _nulls) = StringViewArray::from(vec![
744 Some("foo"),
745 Some("bar"), None,
747 None,
748 Some("baz"),
749 Some("oof"),
750 Some("bar"),
751 Some("i am a long string for test eq in completed"),
752 Some("I am a long string for test eq in COMPLETED"),
753 Some("I am a long string for test eq in completed"),
754 Some("I am a long string for test eq in PROGRESS"),
755 Some("I am a long string for test eq in progress"),
756 ])
757 .into_parts();
758
759 let mut nulls = NullBufferBuilder::new(9);
761 nulls.append_non_null();
762 nulls.append_null(); nulls.append_null();
764 nulls.append_null();
765 nulls.append_non_null();
766 nulls.append_non_null();
767 nulls.append_non_null();
768 nulls.append_non_null();
769 nulls.append_non_null();
770 nulls.append_non_null();
771 nulls.append_non_null();
772 nulls.append_non_null();
773 let input_array =
774 Arc::new(StringViewArray::new(views, buffer, nulls.finish())) as ArrayRef;
775
776 let mut equal_to_results = vec![true; input_array.len()];
778 equal_to(
779 &builder,
780 &[0, 1, 2, 3, 4, 5, 6, 7, 7, 7, 8, 8],
781 &input_array,
782 &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
783 &mut equal_to_results,
784 );
785
786 assert!(!equal_to_results[0]);
787 assert!(equal_to_results[1]);
788 assert!(equal_to_results[2]);
789 assert!(!equal_to_results[3]);
790 assert!(!equal_to_results[4]);
791 assert!(!equal_to_results[5]);
792 assert!(equal_to_results[6]);
793 assert!(!equal_to_results[7]);
794 assert!(!equal_to_results[8]);
795 assert!(equal_to_results[9]);
796 assert!(!equal_to_results[10]);
797 assert!(equal_to_results[11]);
798 }
799
800 #[test]
801 fn test_byte_view_take_n() {
802 let mut builder =
814 ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
815 let input_array = StringViewArray::from(vec![
816 None,
818 None,
819 None,
821 Some("foo"),
822 Some("bar"),
823 None,
825 Some("foo"),
826 Some("this string is quite long"),
827 Some("this string is also quite long"),
828 None,
830 Some("bar"),
831 Some("this string is quite long"),
832 None,
834 Some("foo"),
835 Some("another string that is is quite long"),
836 Some("this string not so long"),
837 None,
839 Some("bar"),
840 Some("this string is quite long"),
841 None,
843 ]);
846
847 let input_array: ArrayRef = Arc::new(input_array);
848 let first_ones_to_append = 16; let second_ones_to_append = 4; let final_ones_to_append = input_array.len(); for row in 0..first_ones_to_append {
854 builder.append_val(&input_array, row).unwrap();
855 }
856
857 assert_eq!(builder.completed.len(), 2);
858 assert_eq!(builder.in_progress.len(), 59);
859
860 let taken_array = builder.take_n(2);
862 assert_eq!(&taken_array, &input_array.slice(0, 2));
863
864 let taken_array = builder.take_n(3);
866 assert_eq!(&taken_array, &input_array.slice(2, 3));
867
868 let taken_array = builder.take_n(3);
870 assert_eq!(&taken_array, &input_array.slice(5, 3));
871
872 let taken_array = builder.take_n(1);
873 assert_eq!(&taken_array, &input_array.slice(8, 1));
874
875 let taken_array = builder.take_n(3);
877 assert_eq!(&taken_array, &input_array.slice(9, 3));
878
879 let taken_array = builder.take_n(3);
881 assert_eq!(&taken_array, &input_array.slice(12, 3));
882
883 let taken_array = builder.take_n(1);
884 assert_eq!(&taken_array, &input_array.slice(15, 1));
885
886 assert!(builder.completed.is_empty());
888 assert!(builder.in_progress.is_empty());
889 assert!(builder.views.is_empty());
890
891 for row in first_ones_to_append..first_ones_to_append + second_ones_to_append {
892 builder.append_val(&input_array, row).unwrap();
893 }
894
895 assert!(builder.completed.is_empty());
896 assert_eq!(builder.in_progress.len(), 25);
897
898 let taken_array = builder.take_n(3);
899 assert_eq!(&taken_array, &input_array.slice(16, 3));
900
901 let mut builder =
904 ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
905
906 for row in 0..final_ones_to_append {
907 builder.append_val(&input_array, row).unwrap();
908 }
909
910 assert_eq!(builder.completed.len(), 3);
911 assert_eq!(builder.in_progress.len(), 25);
912
913 let taken_array = builder.take_n(final_ones_to_append);
914 assert_eq!(&taken_array, &input_array);
915 }
916}