datafusion_physical_plan/sorts/multi_level_merge.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Create a stream that do a multi level merge stream
19
20use crate::metrics::BaselineMetrics;
21use crate::{EmptyRecordBatchStream, SpillManager};
22use arrow::array::RecordBatch;
23use std::fmt::{Debug, Formatter};
24use std::mem;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::task::{Context, Poll};
28
29use arrow::datatypes::SchemaRef;
30use datafusion_common::Result;
31use datafusion_execution::memory_pool::MemoryReservation;
32
33use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
34use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
35use crate::stream::RecordBatchStreamAdapter;
36use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
37use datafusion_physical_expr_common::sort_expr::LexOrdering;
38use futures::TryStreamExt;
39use futures::{Stream, StreamExt};
40
41/// Merges a stream of sorted cursors and record batches into a single sorted stream
42///
43/// This is a wrapper around [`SortPreservingMergeStream`](crate::sorts::merge::SortPreservingMergeStream)
44/// that provide it the sorted streams/files to merge while making sure we can merge them in memory.
45/// In case we can't merge all of them in a single pass we will spill the intermediate results to disk
46/// and repeat the process.
47///
48/// ## High level Algorithm
49/// 1. Get the maximum amount of sorted in-memory streams and spill files we can merge with the available memory
50/// 2. Sort them to a sorted stream
51/// 3. Do we have more spill files to merge?
52/// - Yes: write that sorted stream to a spill file,
53/// add that spill file back to the spill files to merge and
54/// repeat the process
55///
56/// - No: return that sorted stream as the final output stream
57///
58/// ```text
59/// Initial State: Multiple sorted streams + spill files
60/// ┌───────────┐
61/// │ Phase 1 │
62/// └───────────┘
63/// ┌──Can hold in memory─┐
64/// │ ┌──────────────┐ │
65/// │ │ In-memory │
66/// │ │sorted stream │──┼────────┐
67/// │ │ 1 │ │ │
68/// └──────────────┘ │ │
69/// │ ┌──────────────┐ │ │
70/// │ │ In-memory │ │
71/// │ │sorted stream │──┼────────┤
72/// │ │ 2 │ │ │
73/// └──────────────┘ │ │
74/// │ ┌──────────────┐ │ │
75/// │ │ In-memory │ │
76/// │ │sorted stream │──┼────────┤
77/// │ │ 3 │ │ │
78/// └──────────────┘ │ │
79/// │ ┌──────────────┐ │ │ ┌───────────┐
80/// │ │ Sorted Spill │ │ │ Phase 2 │
81/// │ │ file 1 │──┼────────┤ └───────────┘
82/// │ └──────────────┘ │ │
83/// ──── ──── ──── ──── ─┘ │ ┌──Can hold in memory─┐
84/// │ │ │
85/// ┌──────────────┐ │ │ ┌──────────────┐
86/// │ Sorted Spill │ │ │ │ Sorted Spill │ │
87/// │ file 2 │──────────────────────▶│ file 2 │──┼─────┐
88/// └──────────────┘ │ └──────────────┘ │ │
89/// ┌──────────────┐ │ │ ┌──────────────┐ │ │
90/// │ Sorted Spill │ │ │ │ Sorted Spill │ │
91/// │ file 3 │──────────────────────▶│ file 3 │──┼─────┤
92/// └──────────────┘ │ │ └──────────────┘ │ │
93/// ┌──────────────┐ │ ┌──────────────┐ │ │
94/// │ Sorted Spill │ │ │ │ Sorted Spill │ │ │
95/// │ file 4 │──────────────────────▶│ file 4 │────────┤ ┌───────────┐
96/// └──────────────┘ │ │ └──────────────┘ │ │ │ Phase 3 │
97/// │ │ │ │ └───────────┘
98/// │ ──── ──── ──── ──── ─┘ │ ┌──Can hold in memory─┐
99/// │ │ │ │
100/// ┌──────────────┐ │ ┌──────────────┐ │ │ ┌──────────────┐
101/// │ Sorted Spill │ │ │ Sorted Spill │ │ │ │ Sorted Spill │ │
102/// │ file 5 │──────────────────────▶│ file 5 │────────────────▶│ file 5 │───┼───┐
103/// └──────────────┘ │ └──────────────┘ │ │ └──────────────┘ │ │
104/// │ │ │ │ │
105/// │ ┌──────────────┐ │ │ ┌──────────────┐ │
106/// │ │ Sorted Spill │ │ │ │ Sorted Spill │ │ │ ┌── ─── ─── ─── ─── ─── ─── ──┐
107/// └──────────▶│ file 6 │────────────────▶│ file 6 │───┼───┼──────▶ Output Stream
108/// └──────────────┘ │ │ └──────────────┘ │ │ └── ─── ─── ─── ─── ─── ─── ──┘
109/// │ │ │ │
110/// │ │ ┌──────────────┐ │
111/// │ │ │ Sorted Spill │ │ │
112/// └───────▶│ file 7 │───┼───┘
113/// │ └──────────────┘ │
114/// │ │
115/// └─ ──── ──── ──── ────
116/// ```
117///
118/// ## Memory Management Strategy
119///
120/// This multi-level merge make sure that we can handle any amount of data to sort as long as
121/// we have enough memory to merge at least 2 streams at a time.
122///
123/// 1. **Worst-Case Memory Reservation**: Reserves memory based on the largest
124/// batch size encountered in each spill file to merge, ensuring sufficient memory is always
125/// available during merge operations.
126/// 2. **Adaptive Buffer Sizing**: Reduces buffer sizes when memory is constrained
127/// 3. **Spill-to-Disk**: Spill to disk when we cannot merge all files in memory
128pub(crate) struct MultiLevelMergeBuilder {
129 spill_manager: SpillManager,
130 schema: SchemaRef,
131 sorted_spill_files: Vec<SortedSpillFile>,
132 sorted_streams: Vec<SendableRecordBatchStream>,
133 expr: LexOrdering,
134 metrics: BaselineMetrics,
135 batch_size: usize,
136 reservation: MemoryReservation,
137 fetch: Option<usize>,
138 enable_round_robin_tie_breaker: bool,
139}
140
141impl Debug for MultiLevelMergeBuilder {
142 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
143 write!(f, "MultiLevelMergeBuilder")
144 }
145}
146
147impl MultiLevelMergeBuilder {
148 #[allow(clippy::too_many_arguments)]
149 pub(crate) fn new(
150 spill_manager: SpillManager,
151 schema: SchemaRef,
152 sorted_spill_files: Vec<SortedSpillFile>,
153 sorted_streams: Vec<SendableRecordBatchStream>,
154 expr: LexOrdering,
155 metrics: BaselineMetrics,
156 batch_size: usize,
157 reservation: MemoryReservation,
158 fetch: Option<usize>,
159 enable_round_robin_tie_breaker: bool,
160 ) -> Self {
161 Self {
162 spill_manager,
163 schema,
164 sorted_spill_files,
165 sorted_streams,
166 expr,
167 metrics,
168 batch_size,
169 reservation,
170 enable_round_robin_tie_breaker,
171 fetch,
172 }
173 }
174
175 pub(crate) fn create_spillable_merge_stream(self) -> SendableRecordBatchStream {
176 Box::pin(RecordBatchStreamAdapter::new(
177 Arc::clone(&self.schema),
178 futures::stream::once(self.create_stream()).try_flatten(),
179 ))
180 }
181
182 async fn create_stream(mut self) -> Result<SendableRecordBatchStream> {
183 loop {
184 let mut stream = self.merge_sorted_runs_within_mem_limit()?;
185
186 // TODO - add a threshold for number of files to disk even if empty and reading from disk so
187 // we can avoid the memory reservation
188
189 // If no spill files are left, we can return the stream as this is the last sorted run
190 // TODO - We can write to disk before reading it back to avoid having multiple streams in memory
191 if self.sorted_spill_files.is_empty() {
192 assert!(
193 self.sorted_streams.is_empty(),
194 "We should not have any sorted streams left"
195 );
196
197 return Ok(stream);
198 }
199
200 // Need to sort to a spill file
201 let Some((spill_file, max_record_batch_memory)) = self
202 .spill_manager
203 .spill_record_batch_stream_and_return_max_batch_memory(
204 &mut stream,
205 "MultiLevelMergeBuilder intermediate spill",
206 )
207 .await?
208 else {
209 continue;
210 };
211
212 // Add the spill file
213 self.sorted_spill_files.push(SortedSpillFile {
214 file: spill_file,
215 max_record_batch_memory,
216 });
217 }
218 }
219
220 /// This tries to create a stream that merges the most sorted streams and sorted spill files
221 /// as possible within the memory limit.
222 fn merge_sorted_runs_within_mem_limit(
223 &mut self,
224 ) -> Result<SendableRecordBatchStream> {
225 match (self.sorted_spill_files.len(), self.sorted_streams.len()) {
226 // No data so empty batch
227 (0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
228 &self.schema,
229 )))),
230
231 // Only in-memory stream, return that
232 (0, 1) => Ok(self.sorted_streams.remove(0)),
233
234 // Only single sorted spill file so return it
235 (1, 0) => {
236 let spill_file = self.sorted_spill_files.remove(0);
237
238 // Not reserving any memory for this disk as we are not holding it in memory
239 self.spill_manager
240 .read_spill_as_stream(spill_file.file, None)
241 }
242
243 // Only in memory streams, so merge them all in a single pass
244 (0, _) => {
245 let sorted_stream = mem::take(&mut self.sorted_streams);
246 self.create_new_merge_sort(
247 sorted_stream,
248 // If we have no sorted spill files left, this is the last run
249 true,
250 true,
251 )
252 }
253
254 // Need to merge multiple streams
255 (_, _) => {
256 let mut memory_reservation = self.reservation.new_empty();
257
258 // Don't account for existing streams memory
259 // as we are not holding the memory for them
260 let mut sorted_streams = mem::take(&mut self.sorted_streams);
261
262 let (sorted_spill_files, buffer_size) = self
263 .get_sorted_spill_files_to_merge(
264 2,
265 // we must have at least 2 streams to merge
266 2_usize.saturating_sub(sorted_streams.len()),
267 &mut memory_reservation,
268 )?;
269
270 let is_only_merging_memory_streams = sorted_spill_files.is_empty();
271
272 for spill in sorted_spill_files {
273 let stream = self
274 .spill_manager
275 .clone()
276 .with_batch_read_buffer_capacity(buffer_size)
277 .read_spill_as_stream(
278 spill.file,
279 Some(spill.max_record_batch_memory),
280 )?;
281 sorted_streams.push(stream);
282 }
283 let merge_sort_stream = self.create_new_merge_sort(
284 sorted_streams,
285 // If we have no sorted spill files left, this is the last run
286 self.sorted_spill_files.is_empty(),
287 is_only_merging_memory_streams,
288 )?;
289
290 // If we're only merging memory streams, we don't need to attach the memory reservation
291 // as it's empty
292 if is_only_merging_memory_streams {
293 assert_eq!(memory_reservation.size(), 0, "when only merging memory streams, we should not have any memory reservation and let the merge sort handle the memory");
294
295 Ok(merge_sort_stream)
296 } else {
297 // Attach the memory reservation to the stream to make sure we have enough memory
298 // throughout the merge process as we bypassed the memory pool for the merge sort stream
299 Ok(Box::pin(StreamAttachedReservation::new(
300 merge_sort_stream,
301 memory_reservation,
302 )))
303 }
304 }
305 }
306 }
307
308 fn create_new_merge_sort(
309 &mut self,
310 streams: Vec<SendableRecordBatchStream>,
311 is_output: bool,
312 all_in_memory: bool,
313 ) -> Result<SendableRecordBatchStream> {
314 let mut builder = StreamingMergeBuilder::new()
315 .with_schema(Arc::clone(&self.schema))
316 .with_expressions(&self.expr)
317 .with_batch_size(self.batch_size)
318 .with_fetch(self.fetch)
319 .with_metrics(if is_output {
320 // Only add the metrics to the last run
321 self.metrics.clone()
322 } else {
323 self.metrics.intermediate()
324 })
325 .with_round_robin_tie_breaker(self.enable_round_robin_tie_breaker)
326 .with_streams(streams);
327
328 if !all_in_memory {
329 // Don't track memory used by this stream as we reserve that memory by worst case sceneries
330 // (reserving memory for the biggest batch in each stream)
331 // TODO - avoid this hack as this can be broken easily when `SortPreservingMergeStream`
332 // changes the implementation to use more/less memory
333 builder = builder.with_bypass_mempool();
334 } else {
335 // If we are only merging in-memory streams, we need to use the memory reservation
336 // because we don't know the maximum size of the batches in the streams
337 builder = builder.with_reservation(self.reservation.new_empty());
338 }
339
340 builder.build()
341 }
342
343 /// Return the sorted spill files to use for the next phase, and the buffer size
344 /// This will try to get as many spill files as possible to merge, and if we don't have enough streams
345 /// it will try to reduce the buffer size until we have enough streams to merge
346 /// otherwise it will return an error
347 fn get_sorted_spill_files_to_merge(
348 &mut self,
349 buffer_len: usize,
350 minimum_number_of_required_streams: usize,
351 reservation: &mut MemoryReservation,
352 ) -> Result<(Vec<SortedSpillFile>, usize)> {
353 assert_ne!(buffer_len, 0, "Buffer length must be greater than 0");
354 let mut number_of_spills_to_read_for_current_phase = 0;
355
356 for spill in &self.sorted_spill_files {
357 // For memory pools that are not shared this is good, for other this is not
358 // and there should be some upper limit to memory reservation so we won't starve the system
359 match reservation.try_grow(get_reserved_byte_for_record_batch_size(
360 spill.max_record_batch_memory * buffer_len,
361 )) {
362 Ok(_) => {
363 number_of_spills_to_read_for_current_phase += 1;
364 }
365 // If we can't grow the reservation, we need to stop
366 Err(err) => {
367 // We must have at least 2 streams to merge, so if we don't have enough memory
368 // fail
369 if minimum_number_of_required_streams
370 > number_of_spills_to_read_for_current_phase
371 {
372 // Free the memory we reserved for this merge as we either try again or fail
373 reservation.free();
374 if buffer_len > 1 {
375 // Try again with smaller buffer size, it will be slower but at least we can merge
376 return self.get_sorted_spill_files_to_merge(
377 buffer_len - 1,
378 minimum_number_of_required_streams,
379 reservation,
380 );
381 }
382
383 return Err(err);
384 }
385
386 // We reached the maximum amount of memory we can use
387 // for this merge
388 break;
389 }
390 }
391 }
392
393 let spills = self
394 .sorted_spill_files
395 .drain(..number_of_spills_to_read_for_current_phase)
396 .collect::<Vec<_>>();
397
398 Ok((spills, buffer_len))
399 }
400}
401
402struct StreamAttachedReservation {
403 stream: SendableRecordBatchStream,
404 reservation: MemoryReservation,
405}
406
407impl StreamAttachedReservation {
408 fn new(stream: SendableRecordBatchStream, reservation: MemoryReservation) -> Self {
409 Self {
410 stream,
411 reservation,
412 }
413 }
414}
415
416impl Stream for StreamAttachedReservation {
417 type Item = Result<RecordBatch>;
418
419 fn poll_next(
420 mut self: Pin<&mut Self>,
421 cx: &mut Context<'_>,
422 ) -> Poll<Option<Self::Item>> {
423 let res = self.stream.poll_next_unpin(cx);
424
425 match res {
426 Poll::Ready(res) => {
427 match res {
428 Some(Ok(batch)) => Poll::Ready(Some(Ok(batch))),
429 Some(Err(err)) => {
430 // Had an error so drop the data
431 self.reservation.free();
432 Poll::Ready(Some(Err(err)))
433 }
434 None => {
435 // Stream is done so free the memory
436 self.reservation.free();
437
438 Poll::Ready(None)
439 }
440 }
441 }
442 Poll::Pending => Poll::Pending,
443 }
444 }
445}
446
447impl RecordBatchStream for StreamAttachedReservation {
448 fn schema(&self) -> SchemaRef {
449 self.stream.schema()
450 }
451}