datafusion_physical_plan/aggregates/order/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::mem::size_of;
19
20use arrow::array::ArrayRef;
21use datafusion_common::Result;
22use datafusion_expr::EmitTo;
23
24mod full;
25mod partial;
26
27use crate::InputOrderMode;
28pub use full::GroupOrderingFull;
29pub use partial::GroupOrderingPartial;
30
31/// Ordering information for each group in the hash table
32#[derive(Debug)]
33pub enum GroupOrdering {
34    /// Groups are not ordered
35    None,
36    /// Groups are ordered by some pre-set of the group keys
37    Partial(GroupOrderingPartial),
38    /// Groups are entirely contiguous,
39    Full(GroupOrderingFull),
40}
41
42impl GroupOrdering {
43    /// Create a `GroupOrdering` for the specified ordering
44    pub fn try_new(mode: &InputOrderMode) -> Result<Self> {
45        match mode {
46            InputOrderMode::Linear => Ok(GroupOrdering::None),
47            InputOrderMode::PartiallySorted(order_indices) => {
48                GroupOrderingPartial::try_new(order_indices.clone())
49                    .map(GroupOrdering::Partial)
50            }
51            InputOrderMode::Sorted => Ok(GroupOrdering::Full(GroupOrderingFull::new())),
52        }
53    }
54
55    // How many groups be emitted, or None if no data can be emitted
56    pub fn emit_to(&self) -> Option<EmitTo> {
57        match self {
58            GroupOrdering::None => None,
59            GroupOrdering::Partial(partial) => partial.emit_to(),
60            GroupOrdering::Full(full) => full.emit_to(),
61        }
62    }
63
64    /// Updates the state the input is done
65    pub fn input_done(&mut self) {
66        match self {
67            GroupOrdering::None => {}
68            GroupOrdering::Partial(partial) => partial.input_done(),
69            GroupOrdering::Full(full) => full.input_done(),
70        }
71    }
72
73    /// remove the first n groups from the internal state, shifting
74    /// all existing indexes down by `n`
75    pub fn remove_groups(&mut self, n: usize) {
76        match self {
77            GroupOrdering::None => {}
78            GroupOrdering::Partial(partial) => partial.remove_groups(n),
79            GroupOrdering::Full(full) => full.remove_groups(n),
80        }
81    }
82
83    /// Called when new groups are added in a batch
84    ///
85    /// * `total_num_groups`: total number of groups (so max
86    ///   group_index is total_num_groups - 1).
87    ///
88    /// * `group_values`: group key values for *each row* in the batch
89    ///
90    /// * `group_indices`: indices for each row in the batch
91    ///
92    /// * `hashes`: hash values for each row in the batch
93    pub fn new_groups(
94        &mut self,
95        batch_group_values: &[ArrayRef],
96        group_indices: &[usize],
97        total_num_groups: usize,
98    ) -> Result<()> {
99        match self {
100            GroupOrdering::None => {}
101            GroupOrdering::Partial(partial) => {
102                partial.new_groups(
103                    batch_group_values,
104                    group_indices,
105                    total_num_groups,
106                )?;
107            }
108            GroupOrdering::Full(full) => {
109                full.new_groups(total_num_groups);
110            }
111        };
112        Ok(())
113    }
114
115    /// Return the size of memory used by the ordering state, in bytes
116    pub fn size(&self) -> usize {
117        size_of::<Self>()
118            + match self {
119                GroupOrdering::None => 0,
120                GroupOrdering::Partial(partial) => partial.size(),
121                GroupOrdering::Full(full) => full.size(),
122            }
123    }
124}