datafusion_physical_plan/joins/sort_merge_join/
metrics.rs1use crate::metrics::{
21 BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, SpillMetrics,
22 Time,
23};
24
25#[allow(dead_code)]
27pub(super) struct SortMergeJoinMetrics {
28 join_time: Time,
30 input_batches: Count,
32 input_rows: Count,
34 output_batches: Count,
36 baseline_metrics: BaselineMetrics,
38 peak_mem_used: Gauge,
41 spill_metrics: SpillMetrics,
43}
44
45impl SortMergeJoinMetrics {
46 #[allow(dead_code)]
47 pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
48 let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition);
49 let input_batches =
50 MetricBuilder::new(metrics).counter("input_batches", partition);
51 let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
52 let output_batches =
53 MetricBuilder::new(metrics).counter("output_batches", partition);
54 let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", partition);
55 let spill_metrics = SpillMetrics::new(metrics, partition);
56
57 let baseline_metrics = BaselineMetrics::new(metrics, partition);
58
59 Self {
60 join_time,
61 input_batches,
62 input_rows,
63 output_batches,
64 baseline_metrics,
65 peak_mem_used,
66 spill_metrics,
67 }
68 }
69
70 pub fn join_time(&self) -> Time {
71 self.join_time.clone()
72 }
73
74 pub fn baseline_metrics(&self) -> BaselineMetrics {
75 self.baseline_metrics.clone()
76 }
77
78 pub fn input_batches(&self) -> Count {
79 self.input_batches.clone()
80 }
81
82 pub fn input_rows(&self) -> Count {
83 self.input_rows.clone()
84 }
85 pub fn output_batches(&self) -> Count {
86 self.output_batches.clone()
87 }
88
89 pub fn peak_mem_used(&self) -> Gauge {
90 self.peak_mem_used.clone()
91 }
92
93 pub fn spill_metrics(&self) -> SpillMetrics {
94 self.spill_metrics.clone()
95 }
96}