datafusion_physical_plan/aggregates/group_values/single_group_by/
bytes_view.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::aggregates::group_values::GroupValues;
19use arrow::array::{Array, ArrayRef, RecordBatch};
20use datafusion_expr::EmitTo;
21use datafusion_physical_expr::binary_map::OutputType;
22use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewMap;
23use std::mem::size_of;
24
25/// A [`GroupValues`] storing single column of Utf8View/BinaryView values
26///
27/// This specialization is significantly faster than using the more general
28/// purpose `Row`s format
29pub struct GroupValuesBytesView {
30    /// Map string/binary values to group index
31    map: ArrowBytesViewMap<usize>,
32    /// The total number of groups so far (used to assign group_index)
33    num_groups: usize,
34}
35
36impl GroupValuesBytesView {
37    pub fn new(output_type: OutputType) -> Self {
38        Self {
39            map: ArrowBytesViewMap::new(output_type),
40            num_groups: 0,
41        }
42    }
43}
44
45impl GroupValues for GroupValuesBytesView {
46    fn intern(
47        &mut self,
48        cols: &[ArrayRef],
49        groups: &mut Vec<usize>,
50    ) -> datafusion_common::Result<()> {
51        assert_eq!(cols.len(), 1);
52
53        // look up / add entries in the table
54        let arr = &cols[0];
55
56        groups.clear();
57        self.map.insert_if_new(
58            arr,
59            // called for each new group
60            |_value| {
61                // assign new group index on each insert
62                let group_idx = self.num_groups;
63                self.num_groups += 1;
64                group_idx
65            },
66            // called for each group
67            |group_idx| {
68                groups.push(group_idx);
69            },
70        );
71
72        // ensure we assigned a group to for each row
73        assert_eq!(groups.len(), arr.len());
74        Ok(())
75    }
76
77    fn size(&self) -> usize {
78        self.map.size() + size_of::<Self>()
79    }
80
81    fn is_empty(&self) -> bool {
82        self.num_groups == 0
83    }
84
85    fn len(&self) -> usize {
86        self.num_groups
87    }
88
89    fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
90        // Reset the map to default, and convert it into a single array
91        let map_contents = self.map.take().into_state();
92
93        let group_values = match emit_to {
94            EmitTo::All => {
95                self.num_groups -= map_contents.len();
96                map_contents
97            }
98            EmitTo::First(n) if n == self.len() => {
99                self.num_groups -= map_contents.len();
100                map_contents
101            }
102            EmitTo::First(n) => {
103                // if we only wanted to take the first n, insert the rest back
104                // into the map we could potentially avoid this reallocation, at
105                // the expense of much more complex code.
106                // see https://github.com/apache/datafusion/issues/9195
107                let emit_group_values = map_contents.slice(0, n);
108                let remaining_group_values =
109                    map_contents.slice(n, map_contents.len() - n);
110
111                self.num_groups = 0;
112                let mut group_indexes = vec![];
113                self.intern(&[remaining_group_values], &mut group_indexes)?;
114
115                // Verify that the group indexes were assigned in the correct order
116                assert_eq!(0, group_indexes[0]);
117
118                emit_group_values
119            }
120        };
121
122        Ok(vec![group_values])
123    }
124
125    fn clear_shrink(&mut self, _batch: &RecordBatch) {
126        // in theory we could potentially avoid this reallocation and clear the
127        // contents of the maps, but for now we just reset the map from the beginning
128        self.map.take();
129    }
130}