datafusion_physical_plan/aggregates/group_values/single_group_by/
boolean.rs1use crate::aggregates::group_values::GroupValues;
19
20use arrow::array::{
21 ArrayRef, AsArray as _, BooleanArray, BooleanBufferBuilder, NullBufferBuilder,
22 RecordBatch,
23};
24use datafusion_common::Result;
25use datafusion_expr::EmitTo;
26use std::{mem::size_of, sync::Arc};
27
28#[derive(Debug)]
29pub struct GroupValuesBoolean {
30 false_group: Option<usize>,
31 true_group: Option<usize>,
32 null_group: Option<usize>,
33}
34
35impl GroupValuesBoolean {
36 pub fn new() -> Self {
37 Self {
38 false_group: None,
39 true_group: None,
40 null_group: None,
41 }
42 }
43}
44
45impl GroupValues for GroupValuesBoolean {
46 fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
47 let array = cols[0].as_boolean();
48 groups.clear();
49
50 for value in array.iter() {
51 let index = match value {
52 Some(false) => {
53 if let Some(index) = self.false_group {
54 index
55 } else {
56 let index = self.len();
57 self.false_group = Some(index);
58 index
59 }
60 }
61 Some(true) => {
62 if let Some(index) = self.true_group {
63 index
64 } else {
65 let index = self.len();
66 self.true_group = Some(index);
67 index
68 }
69 }
70 None => {
71 if let Some(index) = self.null_group {
72 index
73 } else {
74 let index = self.len();
75 self.null_group = Some(index);
76 index
77 }
78 }
79 };
80
81 groups.push(index);
82 }
83
84 Ok(())
85 }
86
87 fn size(&self) -> usize {
88 size_of::<Self>()
89 }
90
91 fn is_empty(&self) -> bool {
92 self.len() == 0
93 }
94
95 fn len(&self) -> usize {
96 self.false_group.is_some() as usize
97 + self.true_group.is_some() as usize
98 + self.null_group.is_some() as usize
99 }
100
101 fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
102 let len = self.len();
103 let mut builder = BooleanBufferBuilder::new(len);
104 let emit_count = match emit_to {
105 EmitTo::All => len,
106 EmitTo::First(n) => n,
107 };
108 builder.append_n(emit_count, false);
109 if let Some(idx) = self.true_group.as_mut() {
110 if *idx < emit_count {
111 builder.set_bit(*idx, true);
112 self.true_group = None;
113 } else {
114 *idx -= emit_count;
115 }
116 }
117
118 if let Some(idx) = self.false_group.as_mut() {
119 if *idx < emit_count {
120 self.false_group = None;
122 } else {
123 *idx -= emit_count;
124 }
125 }
126
127 let values = builder.finish();
128
129 let nulls = if let Some(idx) = self.null_group.as_mut() {
130 if *idx < emit_count {
131 let mut buffer = NullBufferBuilder::new(len);
132 buffer.append_n_non_nulls(*idx);
133 buffer.append_null();
134 buffer.append_n_non_nulls(emit_count - *idx - 1);
135
136 self.null_group = None;
137 Some(buffer.finish().unwrap())
138 } else {
139 *idx -= emit_count;
140 None
141 }
142 } else {
143 None
144 };
145
146 Ok(vec![Arc::new(BooleanArray::new(values, nulls)) as _])
147 }
148
149 fn clear_shrink(&mut self, _batch: &RecordBatch) {
150 self.false_group = None;
151 self.true_group = None;
152 self.null_group = None;
153 }
154}