datafusion_physical_plan/aggregates/group_values/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`GroupValues`] trait for storing and interning group keys
19
20use arrow::array::types::{
21 Date32Type, Date64Type, Decimal128Type, Time32MillisecondType, Time32SecondType,
22 Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType,
23 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
24};
25use arrow::array::{downcast_primitive, ArrayRef, RecordBatch};
26use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
27use datafusion_common::Result;
28
29use datafusion_expr::EmitTo;
30
31pub mod multi_group_by;
32
33mod row;
34mod single_group_by;
35use datafusion_physical_expr::binary_map::OutputType;
36use multi_group_by::GroupValuesColumn;
37use row::GroupValuesRows;
38
39pub(crate) use single_group_by::primitive::HashValue;
40
41use crate::aggregates::{
42 group_values::single_group_by::{
43 boolean::GroupValuesBoolean, bytes::GroupValuesBytes,
44 bytes_view::GroupValuesBytesView, primitive::GroupValuesPrimitive,
45 },
46 order::GroupOrdering,
47};
48
49mod metrics;
50mod null_builder;
51
52pub(crate) use metrics::GroupByMetrics;
53
54/// Stores the group values during hash aggregation.
55///
56/// # Background
57///
58/// In a query such as `SELECT a, b, count(*) FROM t GROUP BY a, b`, the group values
59/// identify each group, and correspond to all the distinct values of `(a,b)`.
60///
61/// ```sql
62/// -- Input has 4 rows with 3 distinct combinations of (a,b) ("groups")
63/// create table t(a int, b varchar)
64/// as values (1, 'a'), (2, 'b'), (1, 'a'), (3, 'c');
65///
66/// select a, b, count(*) from t group by a, b;
67/// ----
68/// 1 a 2
69/// 2 b 1
70/// 3 c 1
71/// ```
72///
73/// # Design
74///
75/// Managing group values is a performance critical operation in hash
76/// aggregation. The major operations are:
77///
78/// 1. Intern: Quickly finding existing and adding new group values
79/// 2. Emit: Returning the group values as an array
80///
81/// There are multiple specialized implementations of this trait optimized for
82/// different data types and number of columns, optimized for these operations.
83/// See [`new_group_values`] for details.
84///
85/// # Group Ids
86///
87/// Each distinct group in a hash aggregation is identified by a unique group id
88/// (usize) which is assigned by instances of this trait. Group ids are
89/// continuous without gaps, starting from 0.
90pub trait GroupValues: Send {
91 /// Calculates the group id for each input row of `cols`, assigning new
92 /// group ids as necessary.
93 ///
94 /// When the function returns, `groups` must contain the group id for each
95 /// row in `cols`.
96 ///
97 /// If a row has the same value as a previous row, the same group id is
98 /// assigned. If a row has a new value, the next available group id is
99 /// assigned.
100 fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;
101
102 /// Returns the number of bytes of memory used by this [`GroupValues`]
103 fn size(&self) -> usize;
104
105 /// Returns true if this [`GroupValues`] is empty
106 fn is_empty(&self) -> bool;
107
108 /// The number of values (distinct group values) stored in this [`GroupValues`]
109 fn len(&self) -> usize;
110
111 /// Emits the group values
112 fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
113
114 /// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
115 fn clear_shrink(&mut self, batch: &RecordBatch);
116}
117
118/// Return a specialized implementation of [`GroupValues`] for the given schema.
119///
120/// [`GroupValues`] implementations choosing logic:
121///
122/// - If group by single column, and type of this column has
123/// the specific [`GroupValues`] implementation, such implementation
124/// will be chosen.
125///
126/// - If group by multiple columns, and all column types have the specific
127/// `GroupColumn` implementations, `GroupValuesColumn` will be chosen.
128///
129/// - Otherwise, the general implementation `GroupValuesRows` will be chosen.
130///
131/// `GroupColumn`: crate::aggregates::group_values::multi_group_by::GroupColumn
132/// `GroupValuesColumn`: crate::aggregates::group_values::multi_group_by::GroupValuesColumn
133/// `GroupValuesRows`: crate::aggregates::group_values::row::GroupValuesRows
134pub fn new_group_values(
135 schema: SchemaRef,
136 group_ordering: &GroupOrdering,
137) -> Result<Box<dyn GroupValues>> {
138 if schema.fields.len() == 1 {
139 let d = schema.fields[0].data_type();
140
141 macro_rules! downcast_helper {
142 ($t:ty, $d:ident) => {
143 return Ok(Box::new(GroupValuesPrimitive::<$t>::new($d.clone())))
144 };
145 }
146
147 downcast_primitive! {
148 d => (downcast_helper, d),
149 _ => {}
150 }
151
152 match d {
153 DataType::Date32 => {
154 downcast_helper!(Date32Type, d);
155 }
156 DataType::Date64 => {
157 downcast_helper!(Date64Type, d);
158 }
159 DataType::Time32(t) => match t {
160 TimeUnit::Second => downcast_helper!(Time32SecondType, d),
161 TimeUnit::Millisecond => downcast_helper!(Time32MillisecondType, d),
162 _ => {}
163 },
164 DataType::Time64(t) => match t {
165 TimeUnit::Microsecond => downcast_helper!(Time64MicrosecondType, d),
166 TimeUnit::Nanosecond => downcast_helper!(Time64NanosecondType, d),
167 _ => {}
168 },
169 DataType::Timestamp(t, _tz) => match t {
170 TimeUnit::Second => downcast_helper!(TimestampSecondType, d),
171 TimeUnit::Millisecond => downcast_helper!(TimestampMillisecondType, d),
172 TimeUnit::Microsecond => downcast_helper!(TimestampMicrosecondType, d),
173 TimeUnit::Nanosecond => downcast_helper!(TimestampNanosecondType, d),
174 },
175 DataType::Decimal128(_, _) => {
176 downcast_helper!(Decimal128Type, d);
177 }
178 DataType::Utf8 => {
179 return Ok(Box::new(GroupValuesBytes::<i32>::new(OutputType::Utf8)));
180 }
181 DataType::LargeUtf8 => {
182 return Ok(Box::new(GroupValuesBytes::<i64>::new(OutputType::Utf8)));
183 }
184 DataType::Utf8View => {
185 return Ok(Box::new(GroupValuesBytesView::new(OutputType::Utf8View)));
186 }
187 DataType::Binary => {
188 return Ok(Box::new(GroupValuesBytes::<i32>::new(OutputType::Binary)));
189 }
190 DataType::LargeBinary => {
191 return Ok(Box::new(GroupValuesBytes::<i64>::new(OutputType::Binary)));
192 }
193 DataType::BinaryView => {
194 return Ok(Box::new(GroupValuesBytesView::new(OutputType::BinaryView)));
195 }
196 DataType::Boolean => {
197 return Ok(Box::new(GroupValuesBoolean::new()));
198 }
199 _ => {}
200 }
201 }
202
203 if multi_group_by::supported_schema(schema.as_ref()) {
204 if matches!(group_ordering, GroupOrdering::None) {
205 Ok(Box::new(GroupValuesColumn::<false>::try_new(schema)?))
206 } else {
207 Ok(Box::new(GroupValuesColumn::<true>::try_new(schema)?))
208 }
209 } else {
210 Ok(Box::new(GroupValuesRows::try_new(schema)?))
211 }
212}