datafusion_physical_plan/sorts/
multi_level_merge.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Create a stream that do a multi level merge stream
19
20use crate::metrics::BaselineMetrics;
21use crate::{EmptyRecordBatchStream, SpillManager};
22use arrow::array::RecordBatch;
23use std::fmt::{Debug, Formatter};
24use std::mem;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::task::{Context, Poll};
28
29use arrow::datatypes::SchemaRef;
30use datafusion_common::Result;
31use datafusion_execution::memory_pool::MemoryReservation;
32
33use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
34use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
35use crate::stream::RecordBatchStreamAdapter;
36use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
37use datafusion_physical_expr_common::sort_expr::LexOrdering;
38use futures::TryStreamExt;
39use futures::{Stream, StreamExt};
40
41/// Merges a stream of sorted cursors and record batches into a single sorted stream
42///
43/// This is a wrapper around [`SortPreservingMergeStream`](crate::sorts::merge::SortPreservingMergeStream)
44/// that provide it the sorted streams/files to merge while making sure we can merge them in memory.
45/// In case we can't merge all of them in a single pass we will spill the intermediate results to disk
46/// and repeat the process.
47///
48/// ## High level Algorithm
49/// 1. Get the maximum amount of sorted in-memory streams and spill files we can merge with the available memory
50/// 2. Sort them to a sorted stream
51/// 3. Do we have more spill files to merge?
52///  - Yes: write that sorted stream to a spill file,
53///    add that spill file back to the spill files to merge and
54///    repeat the process
55///
56///  - No: return that sorted stream as the final output stream
57///
58/// ```text
59/// Initial State: Multiple sorted streams + spill files
60///      ┌───────────┐
61///      │  Phase 1  │
62///      └───────────┘
63/// ┌──Can hold in memory─┐
64/// │   ┌──────────────┐  │
65/// │   │  In-memory   │
66/// │   │sorted stream │──┼────────┐
67/// │   │      1       │  │        │
68///     └──────────────┘  │        │
69/// │   ┌──────────────┐  │        │
70/// │   │  In-memory   │           │
71/// │   │sorted stream │──┼────────┤
72/// │   │      2       │  │        │
73///     └──────────────┘  │        │
74/// │   ┌──────────────┐  │        │
75/// │   │  In-memory   │           │
76/// │   │sorted stream │──┼────────┤
77/// │   │      3       │  │        │
78///     └──────────────┘  │        │
79/// │   ┌──────────────┐  │        │            ┌───────────┐
80/// │   │ Sorted Spill │           │            │  Phase 2  │
81/// │   │    file 1    │──┼────────┤            └───────────┘
82/// │   └──────────────┘  │        │
83///  ──── ──── ──── ──── ─┘        │       ┌──Can hold in memory─┐
84///                                │       │                     │
85///     ┌──────────────┐           │       │   ┌──────────────┐
86///     │ Sorted Spill │           │       │   │ Sorted Spill │  │
87///     │    file 2    │──────────────────────▶│    file 2    │──┼─────┐
88///     └──────────────┘           │           └──────────────┘  │     │
89///     ┌──────────────┐           │       │   ┌──────────────┐  │     │
90///     │ Sorted Spill │           │       │   │ Sorted Spill │        │
91///     │    file 3    │──────────────────────▶│    file 3    │──┼─────┤
92///     └──────────────┘           │       │   └──────────────┘  │     │
93///     ┌──────────────┐           │           ┌──────────────┐  │     │
94///     │ Sorted Spill │           │       │   │ Sorted Spill │  │     │
95///     │    file 4    │──────────────────────▶│    file 4    │────────┤          ┌───────────┐
96///     └──────────────┘           │       │   └──────────────┘  │     │          │  Phase 3  │
97///                                │       │                     │     │          └───────────┘
98///                                │        ──── ──── ──── ──── ─┘     │     ┌──Can hold in memory─┐
99///                                │                                   │     │                     │
100///     ┌──────────────┐           │           ┌──────────────┐        │     │  ┌──────────────┐
101///     │ Sorted Spill │           │           │ Sorted Spill │        │     │  │ Sorted Spill │   │
102///     │    file 5    │──────────────────────▶│    file 5    │────────────────▶│    file 5    │───┼───┐
103///     └──────────────┘           │           └──────────────┘        │     │  └──────────────┘   │   │
104///                                │                                   │     │                     │   │
105///                                │           ┌──────────────┐        │     │  ┌──────────────┐       │
106///                                │           │ Sorted Spill │        │     │  │ Sorted Spill │   │   │       ┌── ─── ─── ─── ─── ─── ─── ──┐
107///                                └──────────▶│    file 6    │────────────────▶│    file 6    │───┼───┼──────▶         Output Stream
108///                                            └──────────────┘        │     │  └──────────────┘   │   │       └── ─── ─── ─── ─── ─── ─── ──┘
109///                                                                    │     │                     │   │
110///                                                                    │     │  ┌──────────────┐       │
111///                                                                    │     │  │ Sorted Spill │   │   │
112///                                                                    └───────▶│    file 7    │───┼───┘
113///                                                                          │  └──────────────┘   │
114///                                                                          │                     │
115///                                                                          └─ ──── ──── ──── ────
116/// ```
117///
118/// ## Memory Management Strategy
119///
120/// This multi-level merge make sure that we can handle any amount of data to sort as long as
121/// we have enough memory to merge at least 2 streams at a time.
122///
123/// 1. **Worst-Case Memory Reservation**: Reserves memory based on the largest
124///    batch size encountered in each spill file to merge, ensuring sufficient memory is always
125///    available during merge operations.
126/// 2. **Adaptive Buffer Sizing**: Reduces buffer sizes when memory is constrained
127/// 3. **Spill-to-Disk**: Spill to disk when we cannot merge all files in memory
128pub(crate) struct MultiLevelMergeBuilder {
129    spill_manager: SpillManager,
130    schema: SchemaRef,
131    sorted_spill_files: Vec<SortedSpillFile>,
132    sorted_streams: Vec<SendableRecordBatchStream>,
133    expr: LexOrdering,
134    metrics: BaselineMetrics,
135    batch_size: usize,
136    reservation: MemoryReservation,
137    fetch: Option<usize>,
138    enable_round_robin_tie_breaker: bool,
139}
140
141impl Debug for MultiLevelMergeBuilder {
142    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
143        write!(f, "MultiLevelMergeBuilder")
144    }
145}
146
147impl MultiLevelMergeBuilder {
148    #[allow(clippy::too_many_arguments)]
149    pub(crate) fn new(
150        spill_manager: SpillManager,
151        schema: SchemaRef,
152        sorted_spill_files: Vec<SortedSpillFile>,
153        sorted_streams: Vec<SendableRecordBatchStream>,
154        expr: LexOrdering,
155        metrics: BaselineMetrics,
156        batch_size: usize,
157        reservation: MemoryReservation,
158        fetch: Option<usize>,
159        enable_round_robin_tie_breaker: bool,
160    ) -> Self {
161        Self {
162            spill_manager,
163            schema,
164            sorted_spill_files,
165            sorted_streams,
166            expr,
167            metrics,
168            batch_size,
169            reservation,
170            enable_round_robin_tie_breaker,
171            fetch,
172        }
173    }
174
175    pub(crate) fn create_spillable_merge_stream(self) -> SendableRecordBatchStream {
176        Box::pin(RecordBatchStreamAdapter::new(
177            Arc::clone(&self.schema),
178            futures::stream::once(self.create_stream()).try_flatten(),
179        ))
180    }
181
182    async fn create_stream(mut self) -> Result<SendableRecordBatchStream> {
183        loop {
184            let mut stream = self.merge_sorted_runs_within_mem_limit()?;
185
186            // TODO - add a threshold for number of files to disk even if empty and reading from disk so
187            //        we can avoid the memory reservation
188
189            // If no spill files are left, we can return the stream as this is the last sorted run
190            // TODO - We can write to disk before reading it back to avoid having multiple streams in memory
191            if self.sorted_spill_files.is_empty() {
192                assert!(
193                    self.sorted_streams.is_empty(),
194                    "We should not have any sorted streams left"
195                );
196
197                return Ok(stream);
198            }
199
200            // Need to sort to a spill file
201            let Some((spill_file, max_record_batch_memory)) = self
202                .spill_manager
203                .spill_record_batch_stream_and_return_max_batch_memory(
204                    &mut stream,
205                    "MultiLevelMergeBuilder intermediate spill",
206                )
207                .await?
208            else {
209                continue;
210            };
211
212            // Add the spill file
213            self.sorted_spill_files.push(SortedSpillFile {
214                file: spill_file,
215                max_record_batch_memory,
216            });
217        }
218    }
219
220    /// This tries to create a stream that merges the most sorted streams and sorted spill files
221    /// as possible within the memory limit.
222    fn merge_sorted_runs_within_mem_limit(
223        &mut self,
224    ) -> Result<SendableRecordBatchStream> {
225        match (self.sorted_spill_files.len(), self.sorted_streams.len()) {
226            // No data so empty batch
227            (0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
228                &self.schema,
229            )))),
230
231            // Only in-memory stream, return that
232            (0, 1) => Ok(self.sorted_streams.remove(0)),
233
234            // Only single sorted spill file so return it
235            (1, 0) => {
236                let spill_file = self.sorted_spill_files.remove(0);
237
238                // Not reserving any memory for this disk as we are not holding it in memory
239                self.spill_manager
240                    .read_spill_as_stream(spill_file.file, None)
241            }
242
243            // Only in memory streams, so merge them all in a single pass
244            (0, _) => {
245                let sorted_stream = mem::take(&mut self.sorted_streams);
246                self.create_new_merge_sort(
247                    sorted_stream,
248                    // If we have no sorted spill files left, this is the last run
249                    true,
250                    true,
251                )
252            }
253
254            // Need to merge multiple streams
255            (_, _) => {
256                let mut memory_reservation = self.reservation.new_empty();
257
258                // Don't account for existing streams memory
259                // as we are not holding the memory for them
260                let mut sorted_streams = mem::take(&mut self.sorted_streams);
261
262                let (sorted_spill_files, buffer_size) = self
263                    .get_sorted_spill_files_to_merge(
264                        2,
265                        // we must have at least 2 streams to merge
266                        2_usize.saturating_sub(sorted_streams.len()),
267                        &mut memory_reservation,
268                    )?;
269
270                let is_only_merging_memory_streams = sorted_spill_files.is_empty();
271
272                for spill in sorted_spill_files {
273                    let stream = self
274                        .spill_manager
275                        .clone()
276                        .with_batch_read_buffer_capacity(buffer_size)
277                        .read_spill_as_stream(
278                            spill.file,
279                            Some(spill.max_record_batch_memory),
280                        )?;
281                    sorted_streams.push(stream);
282                }
283                let merge_sort_stream = self.create_new_merge_sort(
284                    sorted_streams,
285                    // If we have no sorted spill files left, this is the last run
286                    self.sorted_spill_files.is_empty(),
287                    is_only_merging_memory_streams,
288                )?;
289
290                // If we're only merging memory streams, we don't need to attach the memory reservation
291                // as it's empty
292                if is_only_merging_memory_streams {
293                    assert_eq!(memory_reservation.size(), 0, "when only merging memory streams, we should not have any memory reservation and let the merge sort handle the memory");
294
295                    Ok(merge_sort_stream)
296                } else {
297                    // Attach the memory reservation to the stream to make sure we have enough memory
298                    // throughout the merge process as we bypassed the memory pool for the merge sort stream
299                    Ok(Box::pin(StreamAttachedReservation::new(
300                        merge_sort_stream,
301                        memory_reservation,
302                    )))
303                }
304            }
305        }
306    }
307
308    fn create_new_merge_sort(
309        &mut self,
310        streams: Vec<SendableRecordBatchStream>,
311        is_output: bool,
312        all_in_memory: bool,
313    ) -> Result<SendableRecordBatchStream> {
314        let mut builder = StreamingMergeBuilder::new()
315            .with_schema(Arc::clone(&self.schema))
316            .with_expressions(&self.expr)
317            .with_batch_size(self.batch_size)
318            .with_fetch(self.fetch)
319            .with_metrics(if is_output {
320                // Only add the metrics to the last run
321                self.metrics.clone()
322            } else {
323                self.metrics.intermediate()
324            })
325            .with_round_robin_tie_breaker(self.enable_round_robin_tie_breaker)
326            .with_streams(streams);
327
328        if !all_in_memory {
329            // Don't track memory used by this stream as we reserve that memory by worst case sceneries
330            // (reserving memory for the biggest batch in each stream)
331            // TODO - avoid this hack as this can be broken easily when `SortPreservingMergeStream`
332            //        changes the implementation to use more/less memory
333            builder = builder.with_bypass_mempool();
334        } else {
335            // If we are only merging in-memory streams, we need to use the memory reservation
336            // because we don't know the maximum size of the batches in the streams
337            builder = builder.with_reservation(self.reservation.new_empty());
338        }
339
340        builder.build()
341    }
342
343    /// Return the sorted spill files to use for the next phase, and the buffer size
344    /// This will try to get as many spill files as possible to merge, and if we don't have enough streams
345    /// it will try to reduce the buffer size until we have enough streams to merge
346    /// otherwise it will return an error
347    fn get_sorted_spill_files_to_merge(
348        &mut self,
349        buffer_len: usize,
350        minimum_number_of_required_streams: usize,
351        reservation: &mut MemoryReservation,
352    ) -> Result<(Vec<SortedSpillFile>, usize)> {
353        assert_ne!(buffer_len, 0, "Buffer length must be greater than 0");
354        let mut number_of_spills_to_read_for_current_phase = 0;
355
356        for spill in &self.sorted_spill_files {
357            // For memory pools that are not shared this is good, for other this is not
358            // and there should be some upper limit to memory reservation so we won't starve the system
359            match reservation.try_grow(get_reserved_byte_for_record_batch_size(
360                spill.max_record_batch_memory * buffer_len,
361            )) {
362                Ok(_) => {
363                    number_of_spills_to_read_for_current_phase += 1;
364                }
365                // If we can't grow the reservation, we need to stop
366                Err(err) => {
367                    // We must have at least 2 streams to merge, so if we don't have enough memory
368                    // fail
369                    if minimum_number_of_required_streams
370                        > number_of_spills_to_read_for_current_phase
371                    {
372                        // Free the memory we reserved for this merge as we either try again or fail
373                        reservation.free();
374                        if buffer_len > 1 {
375                            // Try again with smaller buffer size, it will be slower but at least we can merge
376                            return self.get_sorted_spill_files_to_merge(
377                                buffer_len - 1,
378                                minimum_number_of_required_streams,
379                                reservation,
380                            );
381                        }
382
383                        return Err(err);
384                    }
385
386                    // We reached the maximum amount of memory we can use
387                    // for this merge
388                    break;
389                }
390            }
391        }
392
393        let spills = self
394            .sorted_spill_files
395            .drain(..number_of_spills_to_read_for_current_phase)
396            .collect::<Vec<_>>();
397
398        Ok((spills, buffer_len))
399    }
400}
401
402struct StreamAttachedReservation {
403    stream: SendableRecordBatchStream,
404    reservation: MemoryReservation,
405}
406
407impl StreamAttachedReservation {
408    fn new(stream: SendableRecordBatchStream, reservation: MemoryReservation) -> Self {
409        Self {
410            stream,
411            reservation,
412        }
413    }
414}
415
416impl Stream for StreamAttachedReservation {
417    type Item = Result<RecordBatch>;
418
419    fn poll_next(
420        mut self: Pin<&mut Self>,
421        cx: &mut Context<'_>,
422    ) -> Poll<Option<Self::Item>> {
423        let res = self.stream.poll_next_unpin(cx);
424
425        match res {
426            Poll::Ready(res) => {
427                match res {
428                    Some(Ok(batch)) => Poll::Ready(Some(Ok(batch))),
429                    Some(Err(err)) => {
430                        // Had an error so drop the data
431                        self.reservation.free();
432                        Poll::Ready(Some(Err(err)))
433                    }
434                    None => {
435                        // Stream is done so free the memory
436                        self.reservation.free();
437
438                        Poll::Ready(None)
439                    }
440                }
441            }
442            Poll::Pending => Poll::Pending,
443        }
444    }
445}
446
447impl RecordBatchStream for StreamAttachedReservation {
448    fn schema(&self) -> SchemaRef {
449        self.stream.schema()
450    }
451}