MultiLevelMergeBuilder

Struct MultiLevelMergeBuilder 

Source
pub(crate) struct MultiLevelMergeBuilder {
    spill_manager: SpillManager,
    schema: SchemaRef,
    sorted_spill_files: Vec<SortedSpillFile>,
    sorted_streams: Vec<SendableRecordBatchStream>,
    expr: LexOrdering,
    metrics: BaselineMetrics,
    batch_size: usize,
    reservation: MemoryReservation,
    fetch: Option<usize>,
    enable_round_robin_tie_breaker: bool,
}
Expand description

Merges a stream of sorted cursors and record batches into a single sorted stream

This is a wrapper around SortPreservingMergeStream that provide it the sorted streams/files to merge while making sure we can merge them in memory. In case we can’t merge all of them in a single pass we will spill the intermediate results to disk and repeat the process.

§High level Algorithm

  1. Get the maximum amount of sorted in-memory streams and spill files we can merge with the available memory
  2. Sort them to a sorted stream
  3. Do we have more spill files to merge?
  • Yes: write that sorted stream to a spill file, add that spill file back to the spill files to merge and repeat the process

  • No: return that sorted stream as the final output stream

Initial State: Multiple sorted streams + spill files
     ┌───────────┐
     │  Phase 1  │
     └───────────┘
┌──Can hold in memory─┐
│   ┌──────────────┐  │
│   │  In-memory   │
│   │sorted stream │──┼────────┐
│   │      1       │  │        │
    └──────────────┘  │        │
│   ┌──────────────┐  │        │
│   │  In-memory   │           │
│   │sorted stream │──┼────────┤
│   │      2       │  │        │
    └──────────────┘  │        │
│   ┌──────────────┐  │        │
│   │  In-memory   │           │
│   │sorted stream │──┼────────┤
│   │      3       │  │        │
    └──────────────┘  │        │
│   ┌──────────────┐  │        │            ┌───────────┐
│   │ Sorted Spill │           │            │  Phase 2  │
│   │    file 1    │──┼────────┤            └───────────┘
│   └──────────────┘  │        │
 ──── ──── ──── ──── ─┘        │       ┌──Can hold in memory─┐
                               │       │                     │
    ┌──────────────┐           │       │   ┌──────────────┐
    │ Sorted Spill │           │       │   │ Sorted Spill │  │
    │    file 2    │──────────────────────▶│    file 2    │──┼─────┐
    └──────────────┘           │           └──────────────┘  │     │
    ┌──────────────┐           │       │   ┌──────────────┐  │     │
    │ Sorted Spill │           │       │   │ Sorted Spill │        │
    │    file 3    │──────────────────────▶│    file 3    │──┼─────┤
    └──────────────┘           │       │   └──────────────┘  │     │
    ┌──────────────┐           │           ┌──────────────┐  │     │
    │ Sorted Spill │           │       │   │ Sorted Spill │  │     │
    │    file 4    │──────────────────────▶│    file 4    │────────┤          ┌───────────┐
    └──────────────┘           │       │   └──────────────┘  │     │          │  Phase 3  │
                               │       │                     │     │          └───────────┘
                               │        ──── ──── ──── ──── ─┘     │     ┌──Can hold in memory─┐
                               │                                   │     │                     │
    ┌──────────────┐           │           ┌──────────────┐        │     │  ┌──────────────┐
    │ Sorted Spill │           │           │ Sorted Spill │        │     │  │ Sorted Spill │   │
    │    file 5    │──────────────────────▶│    file 5    │────────────────▶│    file 5    │───┼───┐
    └──────────────┘           │           └──────────────┘        │     │  └──────────────┘   │   │
                               │                                   │     │                     │   │
                               │           ┌──────────────┐        │     │  ┌──────────────┐       │
                               │           │ Sorted Spill │        │     │  │ Sorted Spill │   │   │       ┌── ─── ─── ─── ─── ─── ─── ──┐
                               └──────────▶│    file 6    │────────────────▶│    file 6    │───┼───┼──────▶         Output Stream
                                           └──────────────┘        │     │  └──────────────┘   │   │       └── ─── ─── ─── ─── ─── ─── ──┘
                                                                   │     │                     │   │
                                                                   │     │  ┌──────────────┐       │
                                                                   │     │  │ Sorted Spill │   │   │
                                                                   └───────▶│    file 7    │───┼───┘
                                                                         │  └──────────────┘   │
                                                                         │                     │
                                                                         └─ ──── ──── ──── ────

§Memory Management Strategy

This multi-level merge make sure that we can handle any amount of data to sort as long as we have enough memory to merge at least 2 streams at a time.

  1. Worst-Case Memory Reservation: Reserves memory based on the largest batch size encountered in each spill file to merge, ensuring sufficient memory is always available during merge operations.
  2. Adaptive Buffer Sizing: Reduces buffer sizes when memory is constrained
  3. Spill-to-Disk: Spill to disk when we cannot merge all files in memory

Fields§

§spill_manager: SpillManager§schema: SchemaRef§sorted_spill_files: Vec<SortedSpillFile>§sorted_streams: Vec<SendableRecordBatchStream>§expr: LexOrdering§metrics: BaselineMetrics§batch_size: usize§reservation: MemoryReservation§fetch: Option<usize>§enable_round_robin_tie_breaker: bool

Implementations§

Source§

impl MultiLevelMergeBuilder

Source

pub(crate) fn new( spill_manager: SpillManager, schema: SchemaRef, sorted_spill_files: Vec<SortedSpillFile>, sorted_streams: Vec<SendableRecordBatchStream>, expr: LexOrdering, metrics: BaselineMetrics, batch_size: usize, reservation: MemoryReservation, fetch: Option<usize>, enable_round_robin_tie_breaker: bool, ) -> Self

Source

pub(crate) fn create_spillable_merge_stream(self) -> SendableRecordBatchStream

Source

async fn create_stream(self) -> Result<SendableRecordBatchStream>

Source

fn merge_sorted_runs_within_mem_limit( &mut self, ) -> Result<SendableRecordBatchStream>

This tries to create a stream that merges the most sorted streams and sorted spill files as possible within the memory limit.

Source

fn create_new_merge_sort( &mut self, streams: Vec<SendableRecordBatchStream>, is_output: bool, all_in_memory: bool, ) -> Result<SendableRecordBatchStream>

Source

fn get_sorted_spill_files_to_merge( &mut self, buffer_len: usize, minimum_number_of_required_streams: usize, reservation: &mut MemoryReservation, ) -> Result<(Vec<SortedSpillFile>, usize)>

Return the sorted spill files to use for the next phase, and the buffer size This will try to get as many spill files as possible to merge, and if we don’t have enough streams it will try to reduce the buffer size until we have enough streams to merge otherwise it will return an error

Trait Implementations§

Source§

impl Debug for MultiLevelMergeBuilder

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> ErasedDestructor for T
where T: 'static,