datafusion_functions_aggregate/min_max/
min_max_bytes.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{
19    Array, ArrayRef, AsArray, BinaryBuilder, BinaryViewBuilder, BooleanArray,
20    LargeBinaryBuilder, LargeStringBuilder, StringBuilder, StringViewBuilder,
21};
22use arrow::datatypes::DataType;
23use datafusion_common::hash_map::Entry;
24use datafusion_common::{internal_err, HashMap, Result};
25use datafusion_expr::{EmitTo, GroupsAccumulator};
26use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::apply_filter_as_nulls;
27use std::mem::size_of;
28use std::sync::Arc;
29
30/// Implements fast Min/Max [`GroupsAccumulator`] for "bytes" types ([`StringArray`],
31/// [`BinaryArray`], [`StringViewArray`], etc)
32///
33/// This implementation dispatches to the appropriate specialized code in
34/// [`MinMaxBytesState`] based on data type and comparison function
35///
36/// [`StringArray`]: arrow::array::StringArray
37/// [`BinaryArray`]: arrow::array::BinaryArray
38/// [`StringViewArray`]: arrow::array::StringViewArray
39#[derive(Debug)]
40pub(crate) struct MinMaxBytesAccumulator {
41    /// Inner data storage.
42    inner: MinMaxBytesState,
43    /// if true, is `MIN` otherwise is `MAX`
44    is_min: bool,
45}
46
47impl MinMaxBytesAccumulator {
48    /// Create a new accumulator for computing `min(val)`
49    pub fn new_min(data_type: DataType) -> Self {
50        Self {
51            inner: MinMaxBytesState::new(data_type),
52            is_min: true,
53        }
54    }
55
56    /// Create a new accumulator fo computing `max(val)`
57    pub fn new_max(data_type: DataType) -> Self {
58        Self {
59            inner: MinMaxBytesState::new(data_type),
60            is_min: false,
61        }
62    }
63}
64
65impl GroupsAccumulator for MinMaxBytesAccumulator {
66    fn update_batch(
67        &mut self,
68        values: &[ArrayRef],
69        group_indices: &[usize],
70        opt_filter: Option<&BooleanArray>,
71        total_num_groups: usize,
72    ) -> Result<()> {
73        let array = &values[0];
74        assert_eq!(array.len(), group_indices.len());
75        assert_eq!(array.data_type(), &self.inner.data_type);
76
77        // apply filter if needed
78        let array = apply_filter_as_nulls(array, opt_filter)?;
79
80        // dispatch to appropriate kernel / specialized implementation
81        fn string_min(a: &[u8], b: &[u8]) -> bool {
82            // safety: only called from this function, which ensures a and b come
83            // from an array with valid utf8 data
84            unsafe {
85                let a = std::str::from_utf8_unchecked(a);
86                let b = std::str::from_utf8_unchecked(b);
87                a < b
88            }
89        }
90        fn string_max(a: &[u8], b: &[u8]) -> bool {
91            // safety: only called from this function, which ensures a and b come
92            // from an array with valid utf8 data
93            unsafe {
94                let a = std::str::from_utf8_unchecked(a);
95                let b = std::str::from_utf8_unchecked(b);
96                a > b
97            }
98        }
99        fn binary_min(a: &[u8], b: &[u8]) -> bool {
100            a < b
101        }
102
103        fn binary_max(a: &[u8], b: &[u8]) -> bool {
104            a > b
105        }
106
107        fn str_to_bytes<'a>(
108            it: impl Iterator<Item = Option<&'a str>>,
109        ) -> impl Iterator<Item = Option<&'a [u8]>> {
110            it.map(|s| s.map(|s| s.as_bytes()))
111        }
112
113        match (self.is_min, &self.inner.data_type) {
114            // Utf8/LargeUtf8/Utf8View Min
115            (true, &DataType::Utf8) => self.inner.update_batch(
116                str_to_bytes(array.as_string::<i32>().iter()),
117                group_indices,
118                total_num_groups,
119                string_min,
120            ),
121            (true, &DataType::LargeUtf8) => self.inner.update_batch(
122                str_to_bytes(array.as_string::<i64>().iter()),
123                group_indices,
124                total_num_groups,
125                string_min,
126            ),
127            (true, &DataType::Utf8View) => self.inner.update_batch(
128                str_to_bytes(array.as_string_view().iter()),
129                group_indices,
130                total_num_groups,
131                string_min,
132            ),
133
134            // Utf8/LargeUtf8/Utf8View Max
135            (false, &DataType::Utf8) => self.inner.update_batch(
136                str_to_bytes(array.as_string::<i32>().iter()),
137                group_indices,
138                total_num_groups,
139                string_max,
140            ),
141            (false, &DataType::LargeUtf8) => self.inner.update_batch(
142                str_to_bytes(array.as_string::<i64>().iter()),
143                group_indices,
144                total_num_groups,
145                string_max,
146            ),
147            (false, &DataType::Utf8View) => self.inner.update_batch(
148                str_to_bytes(array.as_string_view().iter()),
149                group_indices,
150                total_num_groups,
151                string_max,
152            ),
153
154            // Binary/LargeBinary/BinaryView Min
155            (true, &DataType::Binary) => self.inner.update_batch(
156                array.as_binary::<i32>().iter(),
157                group_indices,
158                total_num_groups,
159                binary_min,
160            ),
161            (true, &DataType::LargeBinary) => self.inner.update_batch(
162                array.as_binary::<i64>().iter(),
163                group_indices,
164                total_num_groups,
165                binary_min,
166            ),
167            (true, &DataType::BinaryView) => self.inner.update_batch(
168                array.as_binary_view().iter(),
169                group_indices,
170                total_num_groups,
171                binary_min,
172            ),
173
174            // Binary/LargeBinary/BinaryView Max
175            (false, &DataType::Binary) => self.inner.update_batch(
176                array.as_binary::<i32>().iter(),
177                group_indices,
178                total_num_groups,
179                binary_max,
180            ),
181            (false, &DataType::LargeBinary) => self.inner.update_batch(
182                array.as_binary::<i64>().iter(),
183                group_indices,
184                total_num_groups,
185                binary_max,
186            ),
187            (false, &DataType::BinaryView) => self.inner.update_batch(
188                array.as_binary_view().iter(),
189                group_indices,
190                total_num_groups,
191                binary_max,
192            ),
193
194            _ => internal_err!(
195                "Unexpected combination for MinMaxBytesAccumulator: ({:?}, {:?})",
196                self.is_min,
197                self.inner.data_type
198            ),
199        }
200    }
201
202    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
203        let (data_capacity, min_maxes) = self.inner.emit_to(emit_to);
204
205        // Convert the Vec of bytes to a vec of Strings (at no cost)
206        fn bytes_to_str(
207            min_maxes: Vec<Option<Vec<u8>>>,
208        ) -> impl Iterator<Item = Option<String>> {
209            min_maxes.into_iter().map(|opt| {
210                opt.map(|bytes| {
211                    // Safety: only called on data added from update_batch which ensures
212                    // the input type matched the output type
213                    unsafe { String::from_utf8_unchecked(bytes) }
214                })
215            })
216        }
217
218        let result: ArrayRef = match self.inner.data_type {
219            DataType::Utf8 => {
220                let mut builder =
221                    StringBuilder::with_capacity(min_maxes.len(), data_capacity);
222                for opt in bytes_to_str(min_maxes) {
223                    match opt {
224                        None => builder.append_null(),
225                        Some(s) => builder.append_value(s.as_str()),
226                    }
227                }
228                Arc::new(builder.finish())
229            }
230            DataType::LargeUtf8 => {
231                let mut builder =
232                    LargeStringBuilder::with_capacity(min_maxes.len(), data_capacity);
233                for opt in bytes_to_str(min_maxes) {
234                    match opt {
235                        None => builder.append_null(),
236                        Some(s) => builder.append_value(s.as_str()),
237                    }
238                }
239                Arc::new(builder.finish())
240            }
241            DataType::Utf8View => {
242                let block_size = capacity_to_view_block_size(data_capacity);
243
244                let mut builder = StringViewBuilder::with_capacity(min_maxes.len())
245                    .with_fixed_block_size(block_size);
246                for opt in bytes_to_str(min_maxes) {
247                    match opt {
248                        None => builder.append_null(),
249                        Some(s) => builder.append_value(s.as_str()),
250                    }
251                }
252                Arc::new(builder.finish())
253            }
254            DataType::Binary => {
255                let mut builder =
256                    BinaryBuilder::with_capacity(min_maxes.len(), data_capacity);
257                for opt in min_maxes {
258                    match opt {
259                        None => builder.append_null(),
260                        Some(s) => builder.append_value(s.as_ref() as &[u8]),
261                    }
262                }
263                Arc::new(builder.finish())
264            }
265            DataType::LargeBinary => {
266                let mut builder =
267                    LargeBinaryBuilder::with_capacity(min_maxes.len(), data_capacity);
268                for opt in min_maxes {
269                    match opt {
270                        None => builder.append_null(),
271                        Some(s) => builder.append_value(s.as_ref() as &[u8]),
272                    }
273                }
274                Arc::new(builder.finish())
275            }
276            DataType::BinaryView => {
277                let block_size = capacity_to_view_block_size(data_capacity);
278
279                let mut builder = BinaryViewBuilder::with_capacity(min_maxes.len())
280                    .with_fixed_block_size(block_size);
281                for opt in min_maxes {
282                    match opt {
283                        None => builder.append_null(),
284                        Some(s) => builder.append_value(s.as_ref() as &[u8]),
285                    }
286                }
287                Arc::new(builder.finish())
288            }
289            _ => {
290                return internal_err!(
291                    "Unexpected data type for MinMaxBytesAccumulator: {:?}",
292                    self.inner.data_type
293                );
294            }
295        };
296
297        assert_eq!(&self.inner.data_type, result.data_type());
298        Ok(result)
299    }
300
301    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
302        // min/max are their own states (no transition needed)
303        self.evaluate(emit_to).map(|arr| vec![arr])
304    }
305
306    fn merge_batch(
307        &mut self,
308        values: &[ArrayRef],
309        group_indices: &[usize],
310        opt_filter: Option<&BooleanArray>,
311        total_num_groups: usize,
312    ) -> Result<()> {
313        // min/max are their own states (no transition needed)
314        self.update_batch(values, group_indices, opt_filter, total_num_groups)
315    }
316
317    fn convert_to_state(
318        &self,
319        values: &[ArrayRef],
320        opt_filter: Option<&BooleanArray>,
321    ) -> Result<Vec<ArrayRef>> {
322        // Min/max do not change the values as they are their own states
323        // apply the filter by combining with the null mask, if any
324        let output = apply_filter_as_nulls(&values[0], opt_filter)?;
325        Ok(vec![output])
326    }
327
328    fn supports_convert_to_state(&self) -> bool {
329        true
330    }
331
332    fn size(&self) -> usize {
333        self.inner.size()
334    }
335}
336
337/// Returns the block size in (contiguous buffer size) to use
338/// for a given data capacity (total string length)
339///
340/// This is a heuristic to avoid allocating too many small buffers
341fn capacity_to_view_block_size(data_capacity: usize) -> u32 {
342    let max_block_size = 2 * 1024 * 1024;
343    // Avoid block size equal to zero when calling `with_fixed_block_size()`.
344    if data_capacity == 0 {
345        return 1;
346    }
347    if let Ok(block_size) = u32::try_from(data_capacity) {
348        block_size.min(max_block_size)
349    } else {
350        max_block_size
351    }
352}
353
354/// Stores internal Min/Max state for "bytes" types.
355///
356/// This implementation is general and stores the minimum/maximum for each
357/// groups in an individual byte array, which balances allocations and memory
358/// fragmentation (aka garbage).
359///
360/// ```text
361///                    ┌─────────────────────────────────┐
362///   ┌─────┐    ┌────▶│Option<Vec<u8>> (["A"])          │───────────▶   "A"
363///   │  0  │────┘     └─────────────────────────────────┘
364///   ├─────┤          ┌─────────────────────────────────┐
365///   │  1  │─────────▶│Option<Vec<u8>> (["Z"])          │───────────▶   "Z"
366///   └─────┘          └─────────────────────────────────┘               ...
367///     ...               ...
368///   ┌─────┐          ┌────────────────────────────────┐
369///   │ N-2 │─────────▶│Option<Vec<u8>> (["A"])         │────────────▶   "A"
370///   ├─────┤          └────────────────────────────────┘
371///   │ N-1 │────┐     ┌────────────────────────────────┐
372///   └─────┘    └────▶│Option<Vec<u8>> (["Q"])         │────────────▶   "Q"
373///                    └────────────────────────────────┘
374///
375///                      min_max: Vec<Option<Vec<u8>>
376/// ```
377///
378/// Note that for `StringViewArray` and `BinaryViewArray`, there are potentially
379/// more efficient implementations (e.g. by managing a string data buffer
380/// directly), but then garbage collection, memory management, and final array
381/// construction becomes more complex.
382///
383/// See discussion on <https://github.com/apache/datafusion/issues/6906>
384#[derive(Debug)]
385struct MinMaxBytesState {
386    /// The minimum/maximum value for each group
387    min_max: Vec<Option<Vec<u8>>>,
388    /// The data type of the array
389    data_type: DataType,
390    /// The total bytes of the string data (for pre-allocating the final array,
391    /// and tracking memory usage)
392    total_data_bytes: usize,
393}
394
395/// Implement the MinMaxBytesAccumulator with a comparison function
396/// for comparing strings
397impl MinMaxBytesState {
398    /// Create a new MinMaxBytesAccumulator
399    ///
400    /// # Arguments:
401    /// * `data_type`: The data type of the arrays that will be passed to this accumulator
402    fn new(data_type: DataType) -> Self {
403        Self {
404            min_max: vec![],
405            data_type,
406            total_data_bytes: 0,
407        }
408    }
409
410    /// Set the specified group to the given value, updating memory usage appropriately
411    fn set_value(&mut self, group_index: usize, new_val: &[u8]) {
412        match self.min_max[group_index].as_mut() {
413            None => {
414                self.min_max[group_index] = Some(new_val.to_vec());
415                self.total_data_bytes += new_val.len();
416            }
417            Some(existing_val) => {
418                // Copy data over to avoid re-allocating
419                self.total_data_bytes -= existing_val.len();
420                self.total_data_bytes += new_val.len();
421                existing_val.clear();
422                existing_val.extend_from_slice(new_val);
423            }
424        }
425    }
426
427    /// Updates the min/max values for the given string values
428    ///
429    /// `cmp` is the  comparison function to use, called like `cmp(new_val, existing_val)`
430    /// returns true if the `new_val` should replace `existing_val`
431    fn update_batch<'a, F, I>(
432        &mut self,
433        iter: I,
434        group_indices: &[usize],
435        total_num_groups: usize,
436        mut cmp: F,
437    ) -> Result<()>
438    where
439        F: FnMut(&[u8], &[u8]) -> bool + Send + Sync,
440        I: IntoIterator<Item = Option<&'a [u8]>>,
441    {
442        self.min_max.resize(total_num_groups, None);
443        // Minimize value copies by calculating the new min/maxes for each group
444        // in this batch (either the existing min/max or the new input value)
445        // and updating the owned values in `self.min_maxes` at most once
446        let mut locations = HashMap::<usize, &[u8]>::with_capacity(group_indices.len());
447
448        // Figure out the new min value for each group
449        for (new_val, group_index) in iter.into_iter().zip(group_indices.iter()) {
450            let group_index = *group_index;
451            let Some(new_val) = new_val else {
452                continue; // skip nulls
453            };
454
455            match locations.entry(group_index) {
456                Entry::Occupied(mut occupied_entry) => {
457                    if cmp(new_val, occupied_entry.get()) {
458                        occupied_entry.insert(new_val);
459                    }
460                }
461                Entry::Vacant(vacant_entry) => {
462                    if let Some(old_val) = self.min_max[group_index].as_ref() {
463                        if cmp(new_val, old_val) {
464                            vacant_entry.insert(new_val);
465                        }
466                    } else {
467                        vacant_entry.insert(new_val);
468                    }
469                }
470            };
471        }
472
473        // Update self.min_max with any new min/max values we found in the input
474        for (group_index, location) in locations.iter() {
475            self.set_value(*group_index, location);
476        }
477
478        Ok(())
479    }
480
481    /// Emits the specified min_max values
482    ///
483    /// Returns (data_capacity, min_maxes), updating the current value of total_data_bytes
484    ///
485    /// - `data_capacity`: the total length of all strings and their contents,
486    /// - `min_maxes`: the actual min/max values for each group
487    fn emit_to(&mut self, emit_to: EmitTo) -> (usize, Vec<Option<Vec<u8>>>) {
488        match emit_to {
489            EmitTo::All => {
490                (
491                    std::mem::take(&mut self.total_data_bytes), // reset total bytes and min_max
492                    std::mem::take(&mut self.min_max),
493                )
494            }
495            EmitTo::First(n) => {
496                let first_min_maxes: Vec<_> = self.min_max.drain(..n).collect();
497                let first_data_capacity: usize = first_min_maxes
498                    .iter()
499                    .map(|opt| opt.as_ref().map(|s| s.len()).unwrap_or(0))
500                    .sum();
501                self.total_data_bytes -= first_data_capacity;
502                (first_data_capacity, first_min_maxes)
503            }
504        }
505    }
506
507    fn size(&self) -> usize {
508        self.total_data_bytes + self.min_max.len() * size_of::<Option<Vec<u8>>>()
509    }
510}