datafusion_physical_plan/aggregates/group_values/single_group_by/bytes_view.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::aggregates::group_values::GroupValues;
19use arrow::array::{Array, ArrayRef, RecordBatch};
20use datafusion_expr::EmitTo;
21use datafusion_physical_expr::binary_map::OutputType;
22use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewMap;
23use std::mem::size_of;
24
25/// A [`GroupValues`] storing single column of Utf8View/BinaryView values
26///
27/// This specialization is significantly faster than using the more general
28/// purpose `Row`s format
29pub struct GroupValuesBytesView {
30 /// Map string/binary values to group index
31 map: ArrowBytesViewMap<usize>,
32 /// The total number of groups so far (used to assign group_index)
33 num_groups: usize,
34}
35
36impl GroupValuesBytesView {
37 pub fn new(output_type: OutputType) -> Self {
38 Self {
39 map: ArrowBytesViewMap::new(output_type),
40 num_groups: 0,
41 }
42 }
43}
44
45impl GroupValues for GroupValuesBytesView {
46 fn intern(
47 &mut self,
48 cols: &[ArrayRef],
49 groups: &mut Vec<usize>,
50 ) -> datafusion_common::Result<()> {
51 assert_eq!(cols.len(), 1);
52
53 // look up / add entries in the table
54 let arr = &cols[0];
55
56 groups.clear();
57 self.map.insert_if_new(
58 arr,
59 // called for each new group
60 |_value| {
61 // assign new group index on each insert
62 let group_idx = self.num_groups;
63 self.num_groups += 1;
64 group_idx
65 },
66 // called for each group
67 |group_idx| {
68 groups.push(group_idx);
69 },
70 );
71
72 // ensure we assigned a group to for each row
73 assert_eq!(groups.len(), arr.len());
74 Ok(())
75 }
76
77 fn size(&self) -> usize {
78 self.map.size() + size_of::<Self>()
79 }
80
81 fn is_empty(&self) -> bool {
82 self.num_groups == 0
83 }
84
85 fn len(&self) -> usize {
86 self.num_groups
87 }
88
89 fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
90 // Reset the map to default, and convert it into a single array
91 let map_contents = self.map.take().into_state();
92
93 let group_values = match emit_to {
94 EmitTo::All => {
95 self.num_groups -= map_contents.len();
96 map_contents
97 }
98 EmitTo::First(n) if n == self.len() => {
99 self.num_groups -= map_contents.len();
100 map_contents
101 }
102 EmitTo::First(n) => {
103 // if we only wanted to take the first n, insert the rest back
104 // into the map we could potentially avoid this reallocation, at
105 // the expense of much more complex code.
106 // see https://github.com/apache/datafusion/issues/9195
107 let emit_group_values = map_contents.slice(0, n);
108 let remaining_group_values =
109 map_contents.slice(n, map_contents.len() - n);
110
111 self.num_groups = 0;
112 let mut group_indexes = vec![];
113 self.intern(&[remaining_group_values], &mut group_indexes)?;
114
115 // Verify that the group indexes were assigned in the correct order
116 assert_eq!(0, group_indexes[0]);
117
118 emit_group_values
119 }
120 };
121
122 Ok(vec![group_values])
123 }
124
125 fn clear_shrink(&mut self, _batch: &RecordBatch) {
126 // in theory we could potentially avoid this reallocation and clear the
127 // contents of the maps, but for now we just reset the map from the beginning
128 self.map.take();
129 }
130}