datafusion_physical_plan/aggregates/group_values/
metrics.rs1use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
21
22pub(crate) struct GroupByMetrics {
23 pub(crate) time_calculating_group_ids: Time,
25 pub(crate) aggregate_arguments_time: Time,
27 pub(crate) aggregation_time: Time,
30 pub(crate) emitting_time: Time,
34}
35
36impl GroupByMetrics {
37 pub(crate) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
38 Self {
39 time_calculating_group_ids: MetricBuilder::new(metrics)
40 .subset_time("time_calculating_group_ids", partition),
41 aggregate_arguments_time: MetricBuilder::new(metrics)
42 .subset_time("aggregate_arguments_time", partition),
43 aggregation_time: MetricBuilder::new(metrics)
44 .subset_time("aggregation_time", partition),
45 emitting_time: MetricBuilder::new(metrics)
46 .subset_time("emitting_time", partition),
47 }
48 }
49}
50
51#[cfg(test)]
52mod tests {
53 use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
54 use crate::metrics::MetricsSet;
55 use crate::test::TestMemoryExec;
56 use crate::{collect, ExecutionPlan};
57 use arrow::array::{Float64Array, UInt32Array};
58 use arrow::datatypes::{DataType, Field, Schema};
59 use arrow::record_batch::RecordBatch;
60 use datafusion_common::Result;
61 use datafusion_execution::TaskContext;
62 use datafusion_functions_aggregate::count::count_udaf;
63 use datafusion_functions_aggregate::sum::sum_udaf;
64 use datafusion_physical_expr::aggregate::AggregateExprBuilder;
65 use datafusion_physical_expr::expressions::col;
66 use std::sync::Arc;
67
68 fn assert_groupby_metrics(metrics: &MetricsSet) {
70 let agg_arguments_time = metrics.sum_by_name("aggregate_arguments_time");
71 assert!(agg_arguments_time.is_some());
72 assert!(agg_arguments_time.unwrap().as_usize() > 0);
73
74 let aggregation_time = metrics.sum_by_name("aggregation_time");
75 assert!(aggregation_time.is_some());
76 assert!(aggregation_time.unwrap().as_usize() > 0);
77
78 let emitting_time = metrics.sum_by_name("emitting_time");
79 assert!(emitting_time.is_some());
80 assert!(emitting_time.unwrap().as_usize() > 0);
81 }
82
83 #[tokio::test]
84 async fn test_groupby_metrics_partial_mode() -> Result<()> {
85 let schema = Arc::new(Schema::new(vec![
86 Field::new("a", DataType::UInt32, false),
87 Field::new("b", DataType::Float64, false),
88 ]));
89
90 let batches = (0..5)
92 .map(|i| {
93 RecordBatch::try_new(
94 Arc::clone(&schema),
95 vec![
96 Arc::new(UInt32Array::from(vec![1, 2, 3, 4])),
97 Arc::new(Float64Array::from(vec![
98 i as f64,
99 (i + 1) as f64,
100 (i + 2) as f64,
101 (i + 3) as f64,
102 ])),
103 ],
104 )
105 .unwrap()
106 })
107 .collect::<Vec<_>>();
108
109 let input = TestMemoryExec::try_new_exec(&[batches], Arc::clone(&schema), None)?;
110
111 let group_by =
112 PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]);
113
114 let aggregates = vec![
115 Arc::new(
116 AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?])
117 .schema(Arc::clone(&schema))
118 .alias("SUM(b)")
119 .build()?,
120 ),
121 Arc::new(
122 AggregateExprBuilder::new(count_udaf(), vec![col("b", &schema)?])
123 .schema(Arc::clone(&schema))
124 .alias("COUNT(b)")
125 .build()?,
126 ),
127 ];
128
129 let aggregate_exec = Arc::new(AggregateExec::try_new(
130 AggregateMode::Partial,
131 group_by,
132 aggregates,
133 vec![None, None],
134 input,
135 schema,
136 )?);
137
138 let task_ctx = Arc::new(TaskContext::default());
139 let _result =
140 collect(Arc::clone(&aggregate_exec) as _, Arc::clone(&task_ctx)).await?;
141
142 let metrics = aggregate_exec.metrics().unwrap();
143 assert_groupby_metrics(&metrics);
144
145 Ok(())
146 }
147
148 #[tokio::test]
149 async fn test_groupby_metrics_final_mode() -> Result<()> {
150 let schema = Arc::new(Schema::new(vec![
151 Field::new("a", DataType::UInt32, false),
152 Field::new("b", DataType::Float64, false),
153 ]));
154
155 let batches = (0..3)
156 .map(|i| {
157 RecordBatch::try_new(
158 Arc::clone(&schema),
159 vec![
160 Arc::new(UInt32Array::from(vec![1, 2, 3])),
161 Arc::new(Float64Array::from(vec![
162 i as f64,
163 (i + 1) as f64,
164 (i + 2) as f64,
165 ])),
166 ],
167 )
168 .unwrap()
169 })
170 .collect::<Vec<_>>();
171
172 let partial_input =
173 TestMemoryExec::try_new_exec(&[batches], Arc::clone(&schema), None)?;
174
175 let group_by =
176 PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]);
177
178 let aggregates = vec![Arc::new(
179 AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?])
180 .schema(Arc::clone(&schema))
181 .alias("SUM(b)")
182 .build()?,
183 )];
184
185 let partial_aggregate = Arc::new(AggregateExec::try_new(
187 AggregateMode::Partial,
188 group_by.clone(),
189 aggregates.clone(),
190 vec![None],
191 partial_input,
192 Arc::clone(&schema),
193 )?);
194
195 let final_aggregate = Arc::new(AggregateExec::try_new(
197 AggregateMode::Final,
198 group_by.as_final(),
199 aggregates,
200 vec![None],
201 partial_aggregate,
202 schema,
203 )?);
204
205 let task_ctx = Arc::new(TaskContext::default());
206 let _result =
207 collect(Arc::clone(&final_aggregate) as _, Arc::clone(&task_ctx)).await?;
208
209 let metrics = final_aggregate.metrics().unwrap();
210 assert_groupby_metrics(&metrics);
211
212 Ok(())
213 }
214}