datafusion_physical_plan/sorts/
streaming_merge.rs1use crate::metrics::BaselineMetrics;
22use crate::sorts::multi_level_merge::MultiLevelMergeBuilder;
23use crate::sorts::{
24 merge::SortPreservingMergeStream,
25 stream::{FieldCursorStream, RowCursorStream},
26};
27use crate::{SendableRecordBatchStream, SpillManager};
28use arrow::array::*;
29use arrow::datatypes::{DataType, SchemaRef};
30use datafusion_common::{internal_err, Result};
31use datafusion_execution::disk_manager::RefCountedTempFile;
32use datafusion_execution::memory_pool::{
33 human_readable_size, MemoryConsumer, MemoryPool, MemoryReservation,
34 UnboundedMemoryPool,
35};
36use datafusion_physical_expr_common::sort_expr::LexOrdering;
37use std::sync::Arc;
38
39macro_rules! primitive_merge_helper {
40 ($t:ty, $($v:ident),+) => {
41 merge_helper!(PrimitiveArray<$t>, $($v),+)
42 };
43}
44
45macro_rules! merge_helper {
46 ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident, $enable_round_robin_tie_breaker:ident) => {{
47 let streams =
48 FieldCursorStream::<$t>::new($sort, $streams, $reservation.new_empty());
49 return Ok(Box::pin(SortPreservingMergeStream::new(
50 Box::new(streams),
51 $schema,
52 $tracking_metrics,
53 $batch_size,
54 $fetch,
55 $reservation,
56 $enable_round_robin_tie_breaker,
57 )));
58 }};
59}
60
61pub struct SortedSpillFile {
62 pub file: RefCountedTempFile,
63
64 pub max_record_batch_memory: usize,
66}
67
68impl std::fmt::Debug for SortedSpillFile {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 write!(
71 f,
72 "SortedSpillFile({:?}) takes {}",
73 self.file.path(),
74 human_readable_size(self.max_record_batch_memory)
75 )
76 }
77}
78
79#[derive(Default)]
80pub struct StreamingMergeBuilder<'a> {
81 streams: Vec<SendableRecordBatchStream>,
82 sorted_spill_files: Vec<SortedSpillFile>,
83 spill_manager: Option<SpillManager>,
84 schema: Option<SchemaRef>,
85 expressions: Option<&'a LexOrdering>,
86 metrics: Option<BaselineMetrics>,
87 batch_size: Option<usize>,
88 fetch: Option<usize>,
89 reservation: Option<MemoryReservation>,
90 enable_round_robin_tie_breaker: bool,
91}
92
93impl<'a> StreamingMergeBuilder<'a> {
94 pub fn new() -> Self {
95 Self {
96 enable_round_robin_tie_breaker: true,
97 ..Default::default()
98 }
99 }
100
101 pub fn with_streams(mut self, streams: Vec<SendableRecordBatchStream>) -> Self {
102 self.streams = streams;
103 self
104 }
105
106 pub fn with_sorted_spill_files(
107 mut self,
108 sorted_spill_files: Vec<SortedSpillFile>,
109 ) -> Self {
110 self.sorted_spill_files = sorted_spill_files;
111 self
112 }
113
114 pub fn with_spill_manager(mut self, spill_manager: SpillManager) -> Self {
115 self.spill_manager = Some(spill_manager);
116 self
117 }
118
119 pub fn with_schema(mut self, schema: SchemaRef) -> Self {
120 self.schema = Some(schema);
121 self
122 }
123
124 pub fn with_expressions(mut self, expressions: &'a LexOrdering) -> Self {
125 self.expressions = Some(expressions);
126 self
127 }
128
129 pub fn with_metrics(mut self, metrics: BaselineMetrics) -> Self {
130 self.metrics = Some(metrics);
131 self
132 }
133
134 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
135 self.batch_size = Some(batch_size);
136 self
137 }
138
139 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
140 self.fetch = fetch;
141 self
142 }
143
144 pub fn with_reservation(mut self, reservation: MemoryReservation) -> Self {
145 self.reservation = Some(reservation);
146 self
147 }
148
149 pub fn with_round_robin_tie_breaker(
154 mut self,
155 enable_round_robin_tie_breaker: bool,
156 ) -> Self {
157 self.enable_round_robin_tie_breaker = enable_round_robin_tie_breaker;
158 self
159 }
160
161 pub(super) fn with_bypass_mempool(self) -> Self {
165 let mem_pool: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default());
166
167 self.with_reservation(
168 MemoryConsumer::new("merge stream mock memory").register(&mem_pool),
169 )
170 }
171
172 pub fn build(self) -> Result<SendableRecordBatchStream> {
173 let Self {
174 streams,
175 sorted_spill_files,
176 spill_manager,
177 schema,
178 metrics,
179 batch_size,
180 reservation,
181 fetch,
182 expressions,
183 enable_round_robin_tie_breaker,
184 } = self;
185
186 let Some(expressions) = expressions else {
188 return internal_err!("Sort expressions cannot be empty for streaming merge");
189 };
190
191 if !sorted_spill_files.is_empty() {
192 let schema = schema.expect("Schema cannot be empty for streaming merge");
194 let metrics = metrics.expect("Metrics cannot be empty for streaming merge");
195 let batch_size =
196 batch_size.expect("Batch size cannot be empty for streaming merge");
197 let reservation =
198 reservation.expect("Reservation cannot be empty for streaming merge");
199
200 return Ok(MultiLevelMergeBuilder::new(
201 spill_manager.expect("spill_manager should exist"),
202 schema,
203 sorted_spill_files,
204 streams,
205 expressions.clone(),
206 metrics,
207 batch_size,
208 reservation,
209 fetch,
210 enable_round_robin_tie_breaker,
211 )
212 .create_spillable_merge_stream());
213 }
214
215 if streams.is_empty() {
217 return internal_err!(
218 "Streams/sorted spill files cannot be empty for streaming merge"
219 );
220 }
221
222 let schema = schema.expect("Schema cannot be empty for streaming merge");
224 let metrics = metrics.expect("Metrics cannot be empty for streaming merge");
225 let batch_size =
226 batch_size.expect("Batch size cannot be empty for streaming merge");
227 let reservation =
228 reservation.expect("Reservation cannot be empty for streaming merge");
229
230 if expressions.len() == 1 {
232 let sort = expressions[0].clone();
233 let data_type = sort.expr.data_type(schema.as_ref())?;
234 downcast_primitive! {
235 data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker),
236 DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
237 DataType::Utf8View => merge_helper!(StringViewArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
238 DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
239 DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
240 DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
241 _ => {}
242 }
243 }
244
245 let streams = RowCursorStream::try_new(
246 schema.as_ref(),
247 expressions,
248 streams,
249 reservation.new_empty(),
250 )?;
251 Ok(Box::pin(SortPreservingMergeStream::new(
252 Box::new(streams),
253 schema,
254 metrics,
255 batch_size,
256 fetch,
257 reservation,
258 enable_round_robin_tie_breaker,
259 )))
260 }
261}