datafusion_physical_plan/aggregates/group_values/single_group_by/
boolean.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::aggregates::group_values::GroupValues;
19
20use arrow::array::{
21    ArrayRef, AsArray as _, BooleanArray, BooleanBufferBuilder, NullBufferBuilder,
22    RecordBatch,
23};
24use datafusion_common::Result;
25use datafusion_expr::EmitTo;
26use std::{mem::size_of, sync::Arc};
27
28#[derive(Debug)]
29pub struct GroupValuesBoolean {
30    false_group: Option<usize>,
31    true_group: Option<usize>,
32    null_group: Option<usize>,
33}
34
35impl GroupValuesBoolean {
36    pub fn new() -> Self {
37        Self {
38            false_group: None,
39            true_group: None,
40            null_group: None,
41        }
42    }
43}
44
45impl GroupValues for GroupValuesBoolean {
46    fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
47        let array = cols[0].as_boolean();
48        groups.clear();
49
50        for value in array.iter() {
51            let index = match value {
52                Some(false) => {
53                    if let Some(index) = self.false_group {
54                        index
55                    } else {
56                        let index = self.len();
57                        self.false_group = Some(index);
58                        index
59                    }
60                }
61                Some(true) => {
62                    if let Some(index) = self.true_group {
63                        index
64                    } else {
65                        let index = self.len();
66                        self.true_group = Some(index);
67                        index
68                    }
69                }
70                None => {
71                    if let Some(index) = self.null_group {
72                        index
73                    } else {
74                        let index = self.len();
75                        self.null_group = Some(index);
76                        index
77                    }
78                }
79            };
80
81            groups.push(index);
82        }
83
84        Ok(())
85    }
86
87    fn size(&self) -> usize {
88        size_of::<Self>()
89    }
90
91    fn is_empty(&self) -> bool {
92        self.len() == 0
93    }
94
95    fn len(&self) -> usize {
96        self.false_group.is_some() as usize
97            + self.true_group.is_some() as usize
98            + self.null_group.is_some() as usize
99    }
100
101    fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
102        let len = self.len();
103        let mut builder = BooleanBufferBuilder::new(len);
104        let emit_count = match emit_to {
105            EmitTo::All => len,
106            EmitTo::First(n) => n,
107        };
108        builder.append_n(emit_count, false);
109        if let Some(idx) = self.true_group.as_mut() {
110            if *idx < emit_count {
111                builder.set_bit(*idx, true);
112                self.true_group = None;
113            } else {
114                *idx -= emit_count;
115            }
116        }
117
118        if let Some(idx) = self.false_group.as_mut() {
119            if *idx < emit_count {
120                // already false, no need to set
121                self.false_group = None;
122            } else {
123                *idx -= emit_count;
124            }
125        }
126
127        let values = builder.finish();
128
129        let nulls = if let Some(idx) = self.null_group.as_mut() {
130            if *idx < emit_count {
131                let mut buffer = NullBufferBuilder::new(len);
132                buffer.append_n_non_nulls(*idx);
133                buffer.append_null();
134                buffer.append_n_non_nulls(emit_count - *idx - 1);
135
136                self.null_group = None;
137                Some(buffer.finish().unwrap())
138            } else {
139                *idx -= emit_count;
140                None
141            }
142        } else {
143            None
144        };
145
146        Ok(vec![Arc::new(BooleanArray::new(values, nulls)) as _])
147    }
148
149    fn clear_shrink(&mut self, _batch: &RecordBatch) {
150        self.false_group = None;
151        self.true_group = None;
152        self.null_group = None;
153    }
154}