ExternalSorter

Struct ExternalSorter 

Source
struct ExternalSorter {
Show 13 fields schema: SchemaRef, expr: LexOrdering, batch_size: usize, sort_in_place_threshold_bytes: usize, in_mem_batches: Vec<RecordBatch>, in_progress_spill_file: Option<(InProgressSpillFile, usize)>, finished_spill_files: Vec<SortedSpillFile>, metrics: ExternalSorterMetrics, runtime: Arc<RuntimeEnv>, reservation: MemoryReservation, spill_manager: SpillManager, merge_reservation: MemoryReservation, sort_spill_reservation_bytes: usize,
}
Expand description

Sorts an arbitrary sized, unsorted, stream of [RecordBatch]es to a total order. Depending on the input size and memory manager configuration, writes intermediate results to disk (“spills”) using Arrow IPC format.

§Algorithm

  1. get a non-empty new batch from input

  2. check with the memory manager there is sufficient space to buffer the batch in memory.

2.1 if memory is sufficient, buffer batch in memory, go to 1.

2.2 if no more memory is available, sort all buffered batches and spill to file. buffer the next batch in memory, go to 1.

  1. when input is exhausted, merge all in memory batches and spills to get a total order.

§When data fits in available memory

If there is sufficient memory, data is sorted in memory to produce the output

   ┌─────┐
   │  2  │
   │  3  │
   │  1  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
   │  4  │
   │  2  │                  │
   └─────┘                  ▼
   ┌─────┐
   │  1  │              In memory
   │  4  │─ ─ ─ ─ ─ ─▶ sort/merge  ─ ─ ─ ─ ─▶  total sorted output
   │  1  │
   └─────┘                  ▲
     ...                    │

   ┌─────┐                  │
   │  4  │
   │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
   └─────┘

in_mem_batches

§When data does not fit in available memory

When memory is exhausted, data is first sorted and written to one or more spill files on disk:

   ┌─────┐                               .─────────────────.
   │  2  │                              (                   )
   │  3  │                              │`─────────────────'│
   │  1  │─ ─ ─ ─ ─ ─ ─                 │  ┌────┐           │
   │  4  │             │                │  │ 1  │░          │
   │  2  │                              │  │... │░          │
   └─────┘             ▼                │  │ 4  │░  ┌ ─ ─   │
   ┌─────┐                              │  └────┘░    1  │░ │
   │  1  │         In memory            │   ░░░░░░  │    ░░ │
   │  4  │─ ─ ▶   sort/merge    ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
   │  1  │     and write to file        │           │    ░░ │
   └─────┘                              │             4  │░ │
     ...               ▲                │           └░─░─░░ │
                       │                │            ░░░░░░ │
   ┌─────┐                              │.─────────────────.│
   │  4  │             │                (                   )
   │  3  │─ ─ ─ ─ ─ ─ ─                  `─────────────────'
   └─────┘

in_mem_batches                                  spills
                                        (file on disk in Arrow
                                              IPC format)

Once the input is completely read, the spill files are read and merged with any in memory batches to produce a single total sorted output:

  .─────────────────.
 (                   )
 │`─────────────────'│
 │  ┌────┐           │
 │  │ 1  │░          │
 │  │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
 │  │ 4  │░ ┌────┐   │           │
 │  └────┘░ │ 1  │░  │           ▼
 │   ░░░░░░ │    │░  │
 │          │... │─ ─│─ ─ ─ ▶ merge  ─ ─ ─▶  total sorted output
 │          │    │░  │
 │          │ 4  │░  │           ▲
 │          └────┘░  │           │
 │           ░░░░░░  │
 │.─────────────────.│           │
 (                   )
  `─────────────────'            │
        spills
                                 │

                                 │

    ┌─────┐                      │
    │  1  │
    │  4  │─ ─ ─ ─               │
    └─────┘       │
      ...                   In memory
                  └ ─ ─ ─▶  sort/merge
    ┌─────┐
    │  4  │                      ▲
    │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
    └─────┘

 in_mem_batches

Fields§

§schema: SchemaRef

Schema of the output (and the input)

§expr: LexOrdering

Sort expressions

§batch_size: usize

The target number of rows for output batches

§sort_in_place_threshold_bytes: usize

If the in size of buffered memory batches is below this size, the data will be concatenated and sorted in place rather than sort/merged.

§in_mem_batches: Vec<RecordBatch>

Unsorted input batches stored in the memory buffer

§in_progress_spill_file: Option<(InProgressSpillFile, usize)>

During external sorting, in-memory intermediate data will be appended to this file incrementally. Once finished, this file will be moved to Self::finished_spill_files.

this is a tuple of:

  1. InProgressSpillFile - the file that is being written to
  2. max_record_batch_memory - the maximum memory usage of a single batch in this spill file.
§finished_spill_files: Vec<SortedSpillFile>

If data has previously been spilled, the locations of the spill files (in Arrow IPC format) Within the same spill file, the data might be chunked into multiple batches, and ordered by sort keys.

§metrics: ExternalSorterMetrics

Runtime metrics

§runtime: Arc<RuntimeEnv>

A handle to the runtime to get spill files

§reservation: MemoryReservation

Reservation for in_mem_batches

§spill_manager: SpillManager§merge_reservation: MemoryReservation

Reservation for the merging of in-memory batches. If the sort might spill, sort_spill_reservation_bytes will be pre-reserved to ensure there is some space for this sort/merge.

§sort_spill_reservation_bytes: usize

How much memory to reserve for performing in-memory sort/merges prior to spilling.

Implementations§

Source§

impl ExternalSorter

Source

pub fn new( partition_id: usize, schema: SchemaRef, expr: LexOrdering, batch_size: usize, sort_spill_reservation_bytes: usize, sort_in_place_threshold_bytes: usize, spill_compression: SpillCompression, metrics: &ExecutionPlanMetricsSet, runtime: Arc<RuntimeEnv>, ) -> Result<Self>

Source

async fn insert_batch(&mut self, input: RecordBatch) -> Result<()>

Appends an unsorted [RecordBatch] to in_mem_batches

Updates memory usage metrics, and possibly triggers spilling to disk

Source

fn spilled_before(&self) -> bool

Source

async fn sort(&mut self) -> Result<SendableRecordBatchStream>

Returns the final sorted output of all batches inserted via Self::insert_batch as a stream of [RecordBatch]es.

This process could either be:

  1. An in-memory sort/merge (if the input fit in memory)

  2. A combined streaming merge incorporating both in-memory batches and data from spill files on disk.

Source

fn used(&self) -> usize

How much memory is buffered in this ExternalSorter?

Source

fn spilled_bytes(&self) -> usize

How many bytes have been spilled to disk?

Source

fn spilled_rows(&self) -> usize

How many rows have been spilled to disk?

Source

fn spill_count(&self) -> usize

How many spill files have been created?

Source

async fn consume_and_spill_append( &mut self, globally_sorted_batches: &mut Vec<RecordBatch>, ) -> Result<()>

Appending globally sorted batches to the in-progress spill file, and clears the globally_sorted_batches (also its memory reservation) afterwards.

Source

async fn spill_finish(&mut self) -> Result<()>

Finishes the in-progress spill file and moves it to the finished spill files.

Source

fn organize_stringview_arrays( globally_sorted_batches: &mut Vec<RecordBatch>, ) -> Result<()>

Reconstruct globally_sorted_batches to organize the payload buffers of each StringViewArray in sequential order by calling gc() on them.

Note this is a workaround until https://github.com/apache/arrow-rs/issues/7185 is available

§Rationale

After (merge-based) sorting, all batches will be sorted into a single run, but physically this sorted run is chunked into many small batches. For StringViewArrays inside each sorted run, their inner buffers are not re-constructed by default, leading to non-sequential payload locations (permutated by interleave() Arrow kernel). A single payload buffer might be shared by multiple RecordBatches. When writing each batch to disk, the writer has to write all referenced buffers, because they have to be read back one by one to reduce memory usage. This causes extra disk reads and writes, and potentially execution failure.

§Example

Before sorting: batch1 -> buffer1 batch2 -> buffer2

sorted_batch1 -> buffer1 -> buffer2 sorted_batch2 -> buffer1 -> buffer2

Then when spilling each batch, the writer has to write all referenced buffers repeatedly.

Source

async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()>

Sorts the in-memory batches and merges them into a single sorted run, then writes the result to spill files.

Source

fn in_mem_sort_stream( &mut self, metrics: BaselineMetrics, ) -> Result<SendableRecordBatchStream>

Consumes in_mem_batches returning a sorted stream of batches. This proceeds in one of two ways:

§Small Datasets

For “smaller” datasets, the data is first concatenated into a single batch and then sorted. This is often faster than sorting and then merging.

       ┌─────┐
       │  2  │
       │  3  │
       │  1  │─ ─ ─ ─ ┐            ┌─────┐
       │  4  │                     │  2  │
       │  2  │        │            │  3  │
       └─────┘                     │  1  │             sorted output
       ┌─────┐        ▼            │  4  │                stream
       │  1  │                     │  2  │
       │  4  │─ ─▶ concat ─ ─ ─ ─ ▶│  1  │─ ─ ▶  sort  ─ ─ ─ ─ ─▶
       │  1  │                     │  4  │
       └─────┘        ▲            │  1  │
         ...          │            │ ... │
                                   │  4  │
       ┌─────┐        │            │  3  │
       │  4  │                     └─────┘
       │  3  │─ ─ ─ ─ ┘
       └─────┘
    in_mem_batches
§Larger datasets

For larger datasets, the batches are first sorted individually and then merged together.

     ┌─────┐                ┌─────┐
     │  2  │                │  1  │
     │  3  │                │  2  │
     │  1  │─ ─▶  sort  ─ ─▶│  2  │─ ─ ─ ─ ─ ┐
     │  4  │                │  3  │
     │  2  │                │  4  │          │
     └─────┘                └─────┘               sorted output
     ┌─────┐                ┌─────┐          ▼       stream
     │  1  │                │  1  │
     │  4  │─ ▶  sort  ─ ─ ▶│  1  ├ ─ ─ ▶ merge  ─ ─ ─ ─▶
     │  1  │                │  4  │
     └─────┘                └─────┘          ▲
       ...       ...         ...             │

     ┌─────┐                ┌─────┐          │
     │  4  │                │  3  │
     │  3  │─ ▶  sort  ─ ─ ▶│  4  │─ ─ ─ ─ ─ ┘
     └─────┘                └─────┘

  in_mem_batches
Source

fn sort_batch_stream( &self, batch: RecordBatch, metrics: BaselineMetrics, reservation: MemoryReservation, split: bool, ) -> Result<SendableRecordBatchStream>

Sorts a single RecordBatch into a single stream.

reservation accounts for the memory used by this batch and is released when the sort is complete

passing split true will return a BatchSplitStream where each batch maximum row count will be self.batch_size. If split is false, the stream will return a single batch

Source

fn reserve_memory_for_merge(&mut self) -> Result<()>

If this sort may spill, pre-allocates sort_spill_reservation_bytes of memory to guarantee memory left for the in memory sort/merge.

Source

async fn reserve_memory_for_batch_and_maybe_spill( &mut self, input: &RecordBatch, ) -> Result<()>

Reserves memory to be able to accommodate the given batch. If memory is scarce, tries to spill current in-memory batches to disk first.

Source

fn err_with_oom_context(e: DataFusionError) -> DataFusionError

Wraps the error with a context message suggesting settings to tweak. This is meant to be used with DataFusionError::ResourcesExhausted only.

Trait Implementations§

Source§

impl Debug for ExternalSorter

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> ErasedDestructor for T
where T: 'static,