datafusion_physical_plan/aggregates/group_values/
metrics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Metrics for the various group-by implementations.
19
20use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
21
22pub(crate) struct GroupByMetrics {
23    /// Time spent calculating the group IDs from the evaluated grouping columns.
24    pub(crate) time_calculating_group_ids: Time,
25    /// Time spent evaluating the inputs to the aggregate functions.
26    pub(crate) aggregate_arguments_time: Time,
27    /// Time spent evaluating the aggregate expressions themselves
28    /// (e.g. summing all elements and counting number of elements for `avg` aggregate).
29    pub(crate) aggregation_time: Time,
30    /// Time spent emitting the final results and constructing the record batch
31    /// which includes finalizing the grouping expressions
32    /// (e.g. emit from the hash table in case of hash aggregation) and the accumulators
33    pub(crate) emitting_time: Time,
34}
35
36impl GroupByMetrics {
37    pub(crate) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
38        Self {
39            time_calculating_group_ids: MetricBuilder::new(metrics)
40                .subset_time("time_calculating_group_ids", partition),
41            aggregate_arguments_time: MetricBuilder::new(metrics)
42                .subset_time("aggregate_arguments_time", partition),
43            aggregation_time: MetricBuilder::new(metrics)
44                .subset_time("aggregation_time", partition),
45            emitting_time: MetricBuilder::new(metrics)
46                .subset_time("emitting_time", partition),
47        }
48    }
49}
50
51#[cfg(test)]
52mod tests {
53    use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
54    use crate::metrics::MetricsSet;
55    use crate::test::TestMemoryExec;
56    use crate::{collect, ExecutionPlan};
57    use arrow::array::{Float64Array, UInt32Array};
58    use arrow::datatypes::{DataType, Field, Schema};
59    use arrow::record_batch::RecordBatch;
60    use datafusion_common::Result;
61    use datafusion_execution::TaskContext;
62    use datafusion_functions_aggregate::count::count_udaf;
63    use datafusion_functions_aggregate::sum::sum_udaf;
64    use datafusion_physical_expr::aggregate::AggregateExprBuilder;
65    use datafusion_physical_expr::expressions::col;
66    use std::sync::Arc;
67
68    /// Helper function to verify all three GroupBy metrics exist and have non-zero values
69    fn assert_groupby_metrics(metrics: &MetricsSet) {
70        let agg_arguments_time = metrics.sum_by_name("aggregate_arguments_time");
71        assert!(agg_arguments_time.is_some());
72        assert!(agg_arguments_time.unwrap().as_usize() > 0);
73
74        let aggregation_time = metrics.sum_by_name("aggregation_time");
75        assert!(aggregation_time.is_some());
76        assert!(aggregation_time.unwrap().as_usize() > 0);
77
78        let emitting_time = metrics.sum_by_name("emitting_time");
79        assert!(emitting_time.is_some());
80        assert!(emitting_time.unwrap().as_usize() > 0);
81    }
82
83    #[tokio::test]
84    async fn test_groupby_metrics_partial_mode() -> Result<()> {
85        let schema = Arc::new(Schema::new(vec![
86            Field::new("a", DataType::UInt32, false),
87            Field::new("b", DataType::Float64, false),
88        ]));
89
90        // Create multiple batches to ensure metrics accumulate
91        let batches = (0..5)
92            .map(|i| {
93                RecordBatch::try_new(
94                    Arc::clone(&schema),
95                    vec![
96                        Arc::new(UInt32Array::from(vec![1, 2, 3, 4])),
97                        Arc::new(Float64Array::from(vec![
98                            i as f64,
99                            (i + 1) as f64,
100                            (i + 2) as f64,
101                            (i + 3) as f64,
102                        ])),
103                    ],
104                )
105                .unwrap()
106            })
107            .collect::<Vec<_>>();
108
109        let input = TestMemoryExec::try_new_exec(&[batches], Arc::clone(&schema), None)?;
110
111        let group_by =
112            PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]);
113
114        let aggregates = vec![
115            Arc::new(
116                AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?])
117                    .schema(Arc::clone(&schema))
118                    .alias("SUM(b)")
119                    .build()?,
120            ),
121            Arc::new(
122                AggregateExprBuilder::new(count_udaf(), vec![col("b", &schema)?])
123                    .schema(Arc::clone(&schema))
124                    .alias("COUNT(b)")
125                    .build()?,
126            ),
127        ];
128
129        let aggregate_exec = Arc::new(AggregateExec::try_new(
130            AggregateMode::Partial,
131            group_by,
132            aggregates,
133            vec![None, None],
134            input,
135            schema,
136        )?);
137
138        let task_ctx = Arc::new(TaskContext::default());
139        let _result =
140            collect(Arc::clone(&aggregate_exec) as _, Arc::clone(&task_ctx)).await?;
141
142        let metrics = aggregate_exec.metrics().unwrap();
143        assert_groupby_metrics(&metrics);
144
145        Ok(())
146    }
147
148    #[tokio::test]
149    async fn test_groupby_metrics_final_mode() -> Result<()> {
150        let schema = Arc::new(Schema::new(vec![
151            Field::new("a", DataType::UInt32, false),
152            Field::new("b", DataType::Float64, false),
153        ]));
154
155        let batches = (0..3)
156            .map(|i| {
157                RecordBatch::try_new(
158                    Arc::clone(&schema),
159                    vec![
160                        Arc::new(UInt32Array::from(vec![1, 2, 3])),
161                        Arc::new(Float64Array::from(vec![
162                            i as f64,
163                            (i + 1) as f64,
164                            (i + 2) as f64,
165                        ])),
166                    ],
167                )
168                .unwrap()
169            })
170            .collect::<Vec<_>>();
171
172        let partial_input =
173            TestMemoryExec::try_new_exec(&[batches], Arc::clone(&schema), None)?;
174
175        let group_by =
176            PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]);
177
178        let aggregates = vec![Arc::new(
179            AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?])
180                .schema(Arc::clone(&schema))
181                .alias("SUM(b)")
182                .build()?,
183        )];
184
185        // Create partial aggregate
186        let partial_aggregate = Arc::new(AggregateExec::try_new(
187            AggregateMode::Partial,
188            group_by.clone(),
189            aggregates.clone(),
190            vec![None],
191            partial_input,
192            Arc::clone(&schema),
193        )?);
194
195        // Create final aggregate
196        let final_aggregate = Arc::new(AggregateExec::try_new(
197            AggregateMode::Final,
198            group_by.as_final(),
199            aggregates,
200            vec![None],
201            partial_aggregate,
202            schema,
203        )?);
204
205        let task_ctx = Arc::new(TaskContext::default());
206        let _result =
207            collect(Arc::clone(&final_aggregate) as _, Arc::clone(&task_ctx)).await?;
208
209        let metrics = final_aggregate.metrics().unwrap();
210        assert_groupby_metrics(&metrics);
211
212        Ok(())
213    }
214}