pub(crate) struct MultiLevelMergeBuilder {
spill_manager: SpillManager,
schema: SchemaRef,
sorted_spill_files: Vec<SortedSpillFile>,
sorted_streams: Vec<SendableRecordBatchStream>,
expr: LexOrdering,
metrics: BaselineMetrics,
batch_size: usize,
reservation: MemoryReservation,
fetch: Option<usize>,
enable_round_robin_tie_breaker: bool,
}Expand description
Merges a stream of sorted cursors and record batches into a single sorted stream
This is a wrapper around SortPreservingMergeStream
that provide it the sorted streams/files to merge while making sure we can merge them in memory.
In case we can’t merge all of them in a single pass we will spill the intermediate results to disk
and repeat the process.
§High level Algorithm
- Get the maximum amount of sorted in-memory streams and spill files we can merge with the available memory
- Sort them to a sorted stream
- Do we have more spill files to merge?
-
Yes: write that sorted stream to a spill file, add that spill file back to the spill files to merge and repeat the process
-
No: return that sorted stream as the final output stream
Initial State: Multiple sorted streams + spill files
┌───────────┐
│ Phase 1 │
└───────────┘
┌──Can hold in memory─┐
│ ┌──────────────┐ │
│ │ In-memory │
│ │sorted stream │──┼────────┐
│ │ 1 │ │ │
└──────────────┘ │ │
│ ┌──────────────┐ │ │
│ │ In-memory │ │
│ │sorted stream │──┼────────┤
│ │ 2 │ │ │
└──────────────┘ │ │
│ ┌──────────────┐ │ │
│ │ In-memory │ │
│ │sorted stream │──┼────────┤
│ │ 3 │ │ │
└──────────────┘ │ │
│ ┌──────────────┐ │ │ ┌───────────┐
│ │ Sorted Spill │ │ │ Phase 2 │
│ │ file 1 │──┼────────┤ └───────────┘
│ └──────────────┘ │ │
──── ──── ──── ──── ─┘ │ ┌──Can hold in memory─┐
│ │ │
┌──────────────┐ │ │ ┌──────────────┐
│ Sorted Spill │ │ │ │ Sorted Spill │ │
│ file 2 │──────────────────────▶│ file 2 │──┼─────┐
└──────────────┘ │ └──────────────┘ │ │
┌──────────────┐ │ │ ┌──────────────┐ │ │
│ Sorted Spill │ │ │ │ Sorted Spill │ │
│ file 3 │──────────────────────▶│ file 3 │──┼─────┤
└──────────────┘ │ │ └──────────────┘ │ │
┌──────────────┐ │ ┌──────────────┐ │ │
│ Sorted Spill │ │ │ │ Sorted Spill │ │ │
│ file 4 │──────────────────────▶│ file 4 │────────┤ ┌───────────┐
└──────────────┘ │ │ └──────────────┘ │ │ │ Phase 3 │
│ │ │ │ └───────────┘
│ ──── ──── ──── ──── ─┘ │ ┌──Can hold in memory─┐
│ │ │ │
┌──────────────┐ │ ┌──────────────┐ │ │ ┌──────────────┐
│ Sorted Spill │ │ │ Sorted Spill │ │ │ │ Sorted Spill │ │
│ file 5 │──────────────────────▶│ file 5 │────────────────▶│ file 5 │───┼───┐
└──────────────┘ │ └──────────────┘ │ │ └──────────────┘ │ │
│ │ │ │ │
│ ┌──────────────┐ │ │ ┌──────────────┐ │
│ │ Sorted Spill │ │ │ │ Sorted Spill │ │ │ ┌── ─── ─── ─── ─── ─── ─── ──┐
└──────────▶│ file 6 │────────────────▶│ file 6 │───┼───┼──────▶ Output Stream
└──────────────┘ │ │ └──────────────┘ │ │ └── ─── ─── ─── ─── ─── ─── ──┘
│ │ │ │
│ │ ┌──────────────┐ │
│ │ │ Sorted Spill │ │ │
└───────▶│ file 7 │───┼───┘
│ └──────────────┘ │
│ │
└─ ──── ──── ──── ────§Memory Management Strategy
This multi-level merge make sure that we can handle any amount of data to sort as long as we have enough memory to merge at least 2 streams at a time.
- Worst-Case Memory Reservation: Reserves memory based on the largest batch size encountered in each spill file to merge, ensuring sufficient memory is always available during merge operations.
- Adaptive Buffer Sizing: Reduces buffer sizes when memory is constrained
- Spill-to-Disk: Spill to disk when we cannot merge all files in memory
Fields§
§spill_manager: SpillManager§schema: SchemaRef§sorted_spill_files: Vec<SortedSpillFile>§sorted_streams: Vec<SendableRecordBatchStream>§expr: LexOrdering§metrics: BaselineMetrics§batch_size: usize§reservation: MemoryReservation§fetch: Option<usize>§enable_round_robin_tie_breaker: boolImplementations§
Source§impl MultiLevelMergeBuilder
impl MultiLevelMergeBuilder
pub(crate) fn new( spill_manager: SpillManager, schema: SchemaRef, sorted_spill_files: Vec<SortedSpillFile>, sorted_streams: Vec<SendableRecordBatchStream>, expr: LexOrdering, metrics: BaselineMetrics, batch_size: usize, reservation: MemoryReservation, fetch: Option<usize>, enable_round_robin_tie_breaker: bool, ) -> Self
pub(crate) fn create_spillable_merge_stream(self) -> SendableRecordBatchStream
async fn create_stream(self) -> Result<SendableRecordBatchStream>
Sourcefn merge_sorted_runs_within_mem_limit(
&mut self,
) -> Result<SendableRecordBatchStream>
fn merge_sorted_runs_within_mem_limit( &mut self, ) -> Result<SendableRecordBatchStream>
This tries to create a stream that merges the most sorted streams and sorted spill files as possible within the memory limit.
fn create_new_merge_sort( &mut self, streams: Vec<SendableRecordBatchStream>, is_output: bool, all_in_memory: bool, ) -> Result<SendableRecordBatchStream>
Sourcefn get_sorted_spill_files_to_merge(
&mut self,
buffer_len: usize,
minimum_number_of_required_streams: usize,
reservation: &mut MemoryReservation,
) -> Result<(Vec<SortedSpillFile>, usize)>
fn get_sorted_spill_files_to_merge( &mut self, buffer_len: usize, minimum_number_of_required_streams: usize, reservation: &mut MemoryReservation, ) -> Result<(Vec<SortedSpillFile>, usize)>
Return the sorted spill files to use for the next phase, and the buffer size This will try to get as many spill files as possible to merge, and if we don’t have enough streams it will try to reduce the buffer size until we have enough streams to merge otherwise it will return an error
Trait Implementations§
Auto Trait Implementations§
impl Freeze for MultiLevelMergeBuilder
impl !RefUnwindSafe for MultiLevelMergeBuilder
impl Send for MultiLevelMergeBuilder
impl !Sync for MultiLevelMergeBuilder
impl Unpin for MultiLevelMergeBuilder
impl !UnwindSafe for MultiLevelMergeBuilder
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more