datafusion_physical_plan/aggregates/group_values/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`GroupValues`] trait for storing and interning group keys
19
20use arrow::array::types::{
21    Date32Type, Date64Type, Decimal128Type, Time32MillisecondType, Time32SecondType,
22    Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType,
23    TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
24};
25use arrow::array::{downcast_primitive, ArrayRef, RecordBatch};
26use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
27use datafusion_common::Result;
28
29use datafusion_expr::EmitTo;
30
31pub mod multi_group_by;
32
33mod row;
34mod single_group_by;
35use datafusion_physical_expr::binary_map::OutputType;
36use multi_group_by::GroupValuesColumn;
37use row::GroupValuesRows;
38
39pub(crate) use single_group_by::primitive::HashValue;
40
41use crate::aggregates::{
42    group_values::single_group_by::{
43        boolean::GroupValuesBoolean, bytes::GroupValuesBytes,
44        bytes_view::GroupValuesBytesView, primitive::GroupValuesPrimitive,
45    },
46    order::GroupOrdering,
47};
48
49mod metrics;
50mod null_builder;
51
52pub(crate) use metrics::GroupByMetrics;
53
54/// Stores the group values during hash aggregation.
55///
56/// # Background
57///
58/// In a query such as `SELECT a, b, count(*) FROM t GROUP BY a, b`, the group values
59/// identify each group, and correspond to all the distinct values of `(a,b)`.
60///
61/// ```sql
62/// -- Input has 4 rows with 3 distinct combinations of (a,b) ("groups")
63/// create table t(a int, b varchar)
64/// as values (1, 'a'), (2, 'b'), (1, 'a'), (3, 'c');
65///
66/// select a, b, count(*) from t group by a, b;
67/// ----
68/// 1 a 2
69/// 2 b 1
70/// 3 c 1
71/// ```
72///
73/// # Design
74///
75/// Managing group values is a performance critical operation in hash
76/// aggregation. The major operations are:
77///
78/// 1. Intern: Quickly finding existing and adding new group values
79/// 2. Emit: Returning the group values as an array
80///
81/// There are multiple specialized implementations of this trait optimized for
82/// different data types and number of columns, optimized for these operations.
83/// See [`new_group_values`] for details.
84///
85/// # Group Ids
86///
87/// Each distinct group in a hash aggregation is identified by a unique group id
88/// (usize) which is assigned by instances of this trait. Group ids are
89/// continuous without gaps, starting from 0.
90pub trait GroupValues: Send {
91    /// Calculates the group id for each input row of `cols`, assigning new
92    /// group ids as necessary.
93    ///
94    /// When the function returns, `groups`  must contain the group id for each
95    /// row in `cols`.
96    ///
97    /// If a row has the same value as a previous row, the same group id is
98    /// assigned. If a row has a new value, the next available group id is
99    /// assigned.
100    fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;
101
102    /// Returns the number of bytes of memory used by this [`GroupValues`]
103    fn size(&self) -> usize;
104
105    /// Returns true if this [`GroupValues`] is empty
106    fn is_empty(&self) -> bool;
107
108    /// The number of values (distinct group values) stored in this [`GroupValues`]
109    fn len(&self) -> usize;
110
111    /// Emits the group values
112    fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
113
114    /// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
115    fn clear_shrink(&mut self, batch: &RecordBatch);
116}
117
118/// Return a specialized implementation of [`GroupValues`] for the given schema.
119///
120/// [`GroupValues`] implementations choosing logic:
121///
122///   - If group by single column, and type of this column has
123///     the specific [`GroupValues`] implementation, such implementation
124///     will be chosen.
125///
126///   - If group by multiple columns, and all column types have the specific
127///     `GroupColumn` implementations, `GroupValuesColumn` will be chosen.
128///
129///   - Otherwise, the general implementation `GroupValuesRows` will be chosen.
130///
131/// `GroupColumn`:  crate::aggregates::group_values::multi_group_by::GroupColumn
132/// `GroupValuesColumn`: crate::aggregates::group_values::multi_group_by::GroupValuesColumn
133/// `GroupValuesRows`: crate::aggregates::group_values::row::GroupValuesRows
134pub fn new_group_values(
135    schema: SchemaRef,
136    group_ordering: &GroupOrdering,
137) -> Result<Box<dyn GroupValues>> {
138    if schema.fields.len() == 1 {
139        let d = schema.fields[0].data_type();
140
141        macro_rules! downcast_helper {
142            ($t:ty, $d:ident) => {
143                return Ok(Box::new(GroupValuesPrimitive::<$t>::new($d.clone())))
144            };
145        }
146
147        downcast_primitive! {
148            d => (downcast_helper, d),
149            _ => {}
150        }
151
152        match d {
153            DataType::Date32 => {
154                downcast_helper!(Date32Type, d);
155            }
156            DataType::Date64 => {
157                downcast_helper!(Date64Type, d);
158            }
159            DataType::Time32(t) => match t {
160                TimeUnit::Second => downcast_helper!(Time32SecondType, d),
161                TimeUnit::Millisecond => downcast_helper!(Time32MillisecondType, d),
162                _ => {}
163            },
164            DataType::Time64(t) => match t {
165                TimeUnit::Microsecond => downcast_helper!(Time64MicrosecondType, d),
166                TimeUnit::Nanosecond => downcast_helper!(Time64NanosecondType, d),
167                _ => {}
168            },
169            DataType::Timestamp(t, _tz) => match t {
170                TimeUnit::Second => downcast_helper!(TimestampSecondType, d),
171                TimeUnit::Millisecond => downcast_helper!(TimestampMillisecondType, d),
172                TimeUnit::Microsecond => downcast_helper!(TimestampMicrosecondType, d),
173                TimeUnit::Nanosecond => downcast_helper!(TimestampNanosecondType, d),
174            },
175            DataType::Decimal128(_, _) => {
176                downcast_helper!(Decimal128Type, d);
177            }
178            DataType::Utf8 => {
179                return Ok(Box::new(GroupValuesBytes::<i32>::new(OutputType::Utf8)));
180            }
181            DataType::LargeUtf8 => {
182                return Ok(Box::new(GroupValuesBytes::<i64>::new(OutputType::Utf8)));
183            }
184            DataType::Utf8View => {
185                return Ok(Box::new(GroupValuesBytesView::new(OutputType::Utf8View)));
186            }
187            DataType::Binary => {
188                return Ok(Box::new(GroupValuesBytes::<i32>::new(OutputType::Binary)));
189            }
190            DataType::LargeBinary => {
191                return Ok(Box::new(GroupValuesBytes::<i64>::new(OutputType::Binary)));
192            }
193            DataType::BinaryView => {
194                return Ok(Box::new(GroupValuesBytesView::new(OutputType::BinaryView)));
195            }
196            DataType::Boolean => {
197                return Ok(Box::new(GroupValuesBoolean::new()));
198            }
199            _ => {}
200        }
201    }
202
203    if multi_group_by::supported_schema(schema.as_ref()) {
204        if matches!(group_ordering, GroupOrdering::None) {
205            Ok(Box::new(GroupValuesColumn::<false>::try_new(schema)?))
206        } else {
207            Ok(Box::new(GroupValuesColumn::<true>::try_new(schema)?))
208        }
209    } else {
210        Ok(Box::new(GroupValuesRows::try_new(schema)?))
211    }
212}