1use arrow::array::{
19 Array, ArrayRef, AsArray, BinaryBuilder, BinaryViewBuilder, BooleanArray,
20 LargeBinaryBuilder, LargeStringBuilder, StringBuilder, StringViewBuilder,
21};
22use arrow::datatypes::DataType;
23use datafusion_common::hash_map::Entry;
24use datafusion_common::{internal_err, HashMap, Result};
25use datafusion_expr::{EmitTo, GroupsAccumulator};
26use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::apply_filter_as_nulls;
27use std::mem::size_of;
28use std::sync::Arc;
29
30#[derive(Debug)]
40pub(crate) struct MinMaxBytesAccumulator {
41 inner: MinMaxBytesState,
43 is_min: bool,
45}
46
47impl MinMaxBytesAccumulator {
48 pub fn new_min(data_type: DataType) -> Self {
50 Self {
51 inner: MinMaxBytesState::new(data_type),
52 is_min: true,
53 }
54 }
55
56 pub fn new_max(data_type: DataType) -> Self {
58 Self {
59 inner: MinMaxBytesState::new(data_type),
60 is_min: false,
61 }
62 }
63}
64
65impl GroupsAccumulator for MinMaxBytesAccumulator {
66 fn update_batch(
67 &mut self,
68 values: &[ArrayRef],
69 group_indices: &[usize],
70 opt_filter: Option<&BooleanArray>,
71 total_num_groups: usize,
72 ) -> Result<()> {
73 let array = &values[0];
74 assert_eq!(array.len(), group_indices.len());
75 assert_eq!(array.data_type(), &self.inner.data_type);
76
77 let array = apply_filter_as_nulls(array, opt_filter)?;
79
80 fn string_min(a: &[u8], b: &[u8]) -> bool {
82 unsafe {
85 let a = std::str::from_utf8_unchecked(a);
86 let b = std::str::from_utf8_unchecked(b);
87 a < b
88 }
89 }
90 fn string_max(a: &[u8], b: &[u8]) -> bool {
91 unsafe {
94 let a = std::str::from_utf8_unchecked(a);
95 let b = std::str::from_utf8_unchecked(b);
96 a > b
97 }
98 }
99 fn binary_min(a: &[u8], b: &[u8]) -> bool {
100 a < b
101 }
102
103 fn binary_max(a: &[u8], b: &[u8]) -> bool {
104 a > b
105 }
106
107 fn str_to_bytes<'a>(
108 it: impl Iterator<Item = Option<&'a str>>,
109 ) -> impl Iterator<Item = Option<&'a [u8]>> {
110 it.map(|s| s.map(|s| s.as_bytes()))
111 }
112
113 match (self.is_min, &self.inner.data_type) {
114 (true, &DataType::Utf8) => self.inner.update_batch(
116 str_to_bytes(array.as_string::<i32>().iter()),
117 group_indices,
118 total_num_groups,
119 string_min,
120 ),
121 (true, &DataType::LargeUtf8) => self.inner.update_batch(
122 str_to_bytes(array.as_string::<i64>().iter()),
123 group_indices,
124 total_num_groups,
125 string_min,
126 ),
127 (true, &DataType::Utf8View) => self.inner.update_batch(
128 str_to_bytes(array.as_string_view().iter()),
129 group_indices,
130 total_num_groups,
131 string_min,
132 ),
133
134 (false, &DataType::Utf8) => self.inner.update_batch(
136 str_to_bytes(array.as_string::<i32>().iter()),
137 group_indices,
138 total_num_groups,
139 string_max,
140 ),
141 (false, &DataType::LargeUtf8) => self.inner.update_batch(
142 str_to_bytes(array.as_string::<i64>().iter()),
143 group_indices,
144 total_num_groups,
145 string_max,
146 ),
147 (false, &DataType::Utf8View) => self.inner.update_batch(
148 str_to_bytes(array.as_string_view().iter()),
149 group_indices,
150 total_num_groups,
151 string_max,
152 ),
153
154 (true, &DataType::Binary) => self.inner.update_batch(
156 array.as_binary::<i32>().iter(),
157 group_indices,
158 total_num_groups,
159 binary_min,
160 ),
161 (true, &DataType::LargeBinary) => self.inner.update_batch(
162 array.as_binary::<i64>().iter(),
163 group_indices,
164 total_num_groups,
165 binary_min,
166 ),
167 (true, &DataType::BinaryView) => self.inner.update_batch(
168 array.as_binary_view().iter(),
169 group_indices,
170 total_num_groups,
171 binary_min,
172 ),
173
174 (false, &DataType::Binary) => self.inner.update_batch(
176 array.as_binary::<i32>().iter(),
177 group_indices,
178 total_num_groups,
179 binary_max,
180 ),
181 (false, &DataType::LargeBinary) => self.inner.update_batch(
182 array.as_binary::<i64>().iter(),
183 group_indices,
184 total_num_groups,
185 binary_max,
186 ),
187 (false, &DataType::BinaryView) => self.inner.update_batch(
188 array.as_binary_view().iter(),
189 group_indices,
190 total_num_groups,
191 binary_max,
192 ),
193
194 _ => internal_err!(
195 "Unexpected combination for MinMaxBytesAccumulator: ({:?}, {:?})",
196 self.is_min,
197 self.inner.data_type
198 ),
199 }
200 }
201
202 fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
203 let (data_capacity, min_maxes) = self.inner.emit_to(emit_to);
204
205 fn bytes_to_str(
207 min_maxes: Vec<Option<Vec<u8>>>,
208 ) -> impl Iterator<Item = Option<String>> {
209 min_maxes.into_iter().map(|opt| {
210 opt.map(|bytes| {
211 unsafe { String::from_utf8_unchecked(bytes) }
214 })
215 })
216 }
217
218 let result: ArrayRef = match self.inner.data_type {
219 DataType::Utf8 => {
220 let mut builder =
221 StringBuilder::with_capacity(min_maxes.len(), data_capacity);
222 for opt in bytes_to_str(min_maxes) {
223 match opt {
224 None => builder.append_null(),
225 Some(s) => builder.append_value(s.as_str()),
226 }
227 }
228 Arc::new(builder.finish())
229 }
230 DataType::LargeUtf8 => {
231 let mut builder =
232 LargeStringBuilder::with_capacity(min_maxes.len(), data_capacity);
233 for opt in bytes_to_str(min_maxes) {
234 match opt {
235 None => builder.append_null(),
236 Some(s) => builder.append_value(s.as_str()),
237 }
238 }
239 Arc::new(builder.finish())
240 }
241 DataType::Utf8View => {
242 let block_size = capacity_to_view_block_size(data_capacity);
243
244 let mut builder = StringViewBuilder::with_capacity(min_maxes.len())
245 .with_fixed_block_size(block_size);
246 for opt in bytes_to_str(min_maxes) {
247 match opt {
248 None => builder.append_null(),
249 Some(s) => builder.append_value(s.as_str()),
250 }
251 }
252 Arc::new(builder.finish())
253 }
254 DataType::Binary => {
255 let mut builder =
256 BinaryBuilder::with_capacity(min_maxes.len(), data_capacity);
257 for opt in min_maxes {
258 match opt {
259 None => builder.append_null(),
260 Some(s) => builder.append_value(s.as_ref() as &[u8]),
261 }
262 }
263 Arc::new(builder.finish())
264 }
265 DataType::LargeBinary => {
266 let mut builder =
267 LargeBinaryBuilder::with_capacity(min_maxes.len(), data_capacity);
268 for opt in min_maxes {
269 match opt {
270 None => builder.append_null(),
271 Some(s) => builder.append_value(s.as_ref() as &[u8]),
272 }
273 }
274 Arc::new(builder.finish())
275 }
276 DataType::BinaryView => {
277 let block_size = capacity_to_view_block_size(data_capacity);
278
279 let mut builder = BinaryViewBuilder::with_capacity(min_maxes.len())
280 .with_fixed_block_size(block_size);
281 for opt in min_maxes {
282 match opt {
283 None => builder.append_null(),
284 Some(s) => builder.append_value(s.as_ref() as &[u8]),
285 }
286 }
287 Arc::new(builder.finish())
288 }
289 _ => {
290 return internal_err!(
291 "Unexpected data type for MinMaxBytesAccumulator: {:?}",
292 self.inner.data_type
293 );
294 }
295 };
296
297 assert_eq!(&self.inner.data_type, result.data_type());
298 Ok(result)
299 }
300
301 fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
302 self.evaluate(emit_to).map(|arr| vec![arr])
304 }
305
306 fn merge_batch(
307 &mut self,
308 values: &[ArrayRef],
309 group_indices: &[usize],
310 opt_filter: Option<&BooleanArray>,
311 total_num_groups: usize,
312 ) -> Result<()> {
313 self.update_batch(values, group_indices, opt_filter, total_num_groups)
315 }
316
317 fn convert_to_state(
318 &self,
319 values: &[ArrayRef],
320 opt_filter: Option<&BooleanArray>,
321 ) -> Result<Vec<ArrayRef>> {
322 let output = apply_filter_as_nulls(&values[0], opt_filter)?;
325 Ok(vec![output])
326 }
327
328 fn supports_convert_to_state(&self) -> bool {
329 true
330 }
331
332 fn size(&self) -> usize {
333 self.inner.size()
334 }
335}
336
337fn capacity_to_view_block_size(data_capacity: usize) -> u32 {
342 let max_block_size = 2 * 1024 * 1024;
343 if data_capacity == 0 {
345 return 1;
346 }
347 if let Ok(block_size) = u32::try_from(data_capacity) {
348 block_size.min(max_block_size)
349 } else {
350 max_block_size
351 }
352}
353
354#[derive(Debug)]
385struct MinMaxBytesState {
386 min_max: Vec<Option<Vec<u8>>>,
388 data_type: DataType,
390 total_data_bytes: usize,
393}
394
395impl MinMaxBytesState {
398 fn new(data_type: DataType) -> Self {
403 Self {
404 min_max: vec![],
405 data_type,
406 total_data_bytes: 0,
407 }
408 }
409
410 fn set_value(&mut self, group_index: usize, new_val: &[u8]) {
412 match self.min_max[group_index].as_mut() {
413 None => {
414 self.min_max[group_index] = Some(new_val.to_vec());
415 self.total_data_bytes += new_val.len();
416 }
417 Some(existing_val) => {
418 self.total_data_bytes -= existing_val.len();
420 self.total_data_bytes += new_val.len();
421 existing_val.clear();
422 existing_val.extend_from_slice(new_val);
423 }
424 }
425 }
426
427 fn update_batch<'a, F, I>(
432 &mut self,
433 iter: I,
434 group_indices: &[usize],
435 total_num_groups: usize,
436 mut cmp: F,
437 ) -> Result<()>
438 where
439 F: FnMut(&[u8], &[u8]) -> bool + Send + Sync,
440 I: IntoIterator<Item = Option<&'a [u8]>>,
441 {
442 self.min_max.resize(total_num_groups, None);
443 let mut locations = HashMap::<usize, &[u8]>::with_capacity(group_indices.len());
447
448 for (new_val, group_index) in iter.into_iter().zip(group_indices.iter()) {
450 let group_index = *group_index;
451 let Some(new_val) = new_val else {
452 continue; };
454
455 match locations.entry(group_index) {
456 Entry::Occupied(mut occupied_entry) => {
457 if cmp(new_val, occupied_entry.get()) {
458 occupied_entry.insert(new_val);
459 }
460 }
461 Entry::Vacant(vacant_entry) => {
462 if let Some(old_val) = self.min_max[group_index].as_ref() {
463 if cmp(new_val, old_val) {
464 vacant_entry.insert(new_val);
465 }
466 } else {
467 vacant_entry.insert(new_val);
468 }
469 }
470 };
471 }
472
473 for (group_index, location) in locations.iter() {
475 self.set_value(*group_index, location);
476 }
477
478 Ok(())
479 }
480
481 fn emit_to(&mut self, emit_to: EmitTo) -> (usize, Vec<Option<Vec<u8>>>) {
488 match emit_to {
489 EmitTo::All => {
490 (
491 std::mem::take(&mut self.total_data_bytes), std::mem::take(&mut self.min_max),
493 )
494 }
495 EmitTo::First(n) => {
496 let first_min_maxes: Vec<_> = self.min_max.drain(..n).collect();
497 let first_data_capacity: usize = first_min_maxes
498 .iter()
499 .map(|opt| opt.as_ref().map(|s| s.len()).unwrap_or(0))
500 .sum();
501 self.total_data_bytes -= first_data_capacity;
502 (first_data_capacity, first_min_maxes)
503 }
504 }
505 }
506
507 fn size(&self) -> usize {
508 self.total_data_bytes + self.min_max.len() * size_of::<Option<Vec<u8>>>()
509 }
510}