datafusion_physical_plan/aggregates/group_values/single_group_by/
bytes.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::mem::size_of;
19
20use crate::aggregates::group_values::GroupValues;
21
22use arrow::array::{Array, ArrayRef, OffsetSizeTrait, RecordBatch};
23use datafusion_common::Result;
24use datafusion_expr::EmitTo;
25use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType};
26
27/// A [`GroupValues`] storing single column of Utf8/LargeUtf8/Binary/LargeBinary values
28///
29/// This specialization is significantly faster than using the more general
30/// purpose `Row`s format
31pub struct GroupValuesBytes<O: OffsetSizeTrait> {
32    /// Map string/binary values to group index
33    map: ArrowBytesMap<O, usize>,
34    /// The total number of groups so far (used to assign group_index)
35    num_groups: usize,
36}
37
38impl<O: OffsetSizeTrait> GroupValuesBytes<O> {
39    pub fn new(output_type: OutputType) -> Self {
40        Self {
41            map: ArrowBytesMap::new(output_type),
42            num_groups: 0,
43        }
44    }
45}
46
47impl<O: OffsetSizeTrait> GroupValues for GroupValuesBytes<O> {
48    fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
49        assert_eq!(cols.len(), 1);
50
51        // look up / add entries in the table
52        let arr = &cols[0];
53
54        groups.clear();
55        self.map.insert_if_new(
56            arr,
57            // called for each new group
58            |_value| {
59                // assign new group index on each insert
60                let group_idx = self.num_groups;
61                self.num_groups += 1;
62                group_idx
63            },
64            // called for each group
65            |group_idx| {
66                groups.push(group_idx);
67            },
68        );
69
70        // ensure we assigned a group to for each row
71        assert_eq!(groups.len(), arr.len());
72        Ok(())
73    }
74
75    fn size(&self) -> usize {
76        self.map.size() + size_of::<Self>()
77    }
78
79    fn is_empty(&self) -> bool {
80        self.num_groups == 0
81    }
82
83    fn len(&self) -> usize {
84        self.num_groups
85    }
86
87    fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
88        // Reset the map to default, and convert it into a single array
89        let map_contents = self.map.take().into_state();
90
91        let group_values = match emit_to {
92            EmitTo::All => {
93                self.num_groups -= map_contents.len();
94                map_contents
95            }
96            EmitTo::First(n) if n == self.len() => {
97                self.num_groups -= map_contents.len();
98                map_contents
99            }
100            EmitTo::First(n) => {
101                // if we only wanted to take the first n, insert the rest back
102                // into the map we could potentially avoid this reallocation, at
103                // the expense of much more complex code.
104                // see https://github.com/apache/datafusion/issues/9195
105                let emit_group_values = map_contents.slice(0, n);
106                let remaining_group_values =
107                    map_contents.slice(n, map_contents.len() - n);
108
109                self.num_groups = 0;
110                let mut group_indexes = vec![];
111                self.intern(&[remaining_group_values], &mut group_indexes)?;
112
113                // Verify that the group indexes were assigned in the correct order
114                assert_eq!(0, group_indexes[0]);
115
116                emit_group_values
117            }
118        };
119
120        Ok(vec![group_values])
121    }
122
123    fn clear_shrink(&mut self, _batch: &RecordBatch) {
124        // in theory we could potentially avoid this reallocation and clear the
125        // contents of the maps, but for now we just reset the map from the beginning
126        self.map.take();
127    }
128}