struct ExternalSorter {Show 13 fields
schema: SchemaRef,
expr: LexOrdering,
batch_size: usize,
sort_in_place_threshold_bytes: usize,
in_mem_batches: Vec<RecordBatch>,
in_progress_spill_file: Option<(InProgressSpillFile, usize)>,
finished_spill_files: Vec<SortedSpillFile>,
metrics: ExternalSorterMetrics,
runtime: Arc<RuntimeEnv>,
reservation: MemoryReservation,
spill_manager: SpillManager,
merge_reservation: MemoryReservation,
sort_spill_reservation_bytes: usize,
}Expand description
Sorts an arbitrary sized, unsorted, stream of [RecordBatch]es to
a total order. Depending on the input size and memory manager
configuration, writes intermediate results to disk (“spills”)
using Arrow IPC format.
§Algorithm
-
get a non-empty new batch from input
-
check with the memory manager there is sufficient space to buffer the batch in memory.
2.1 if memory is sufficient, buffer batch in memory, go to 1.
2.2 if no more memory is available, sort all buffered batches and spill to file. buffer the next batch in memory, go to 1.
- when input is exhausted, merge all in memory batches and spills to get a total order.
§When data fits in available memory
If there is sufficient memory, data is sorted in memory to produce the output
┌─────┐
│ 2 │
│ 3 │
│ 1 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
│ 4 │
│ 2 │ │
└─────┘ ▼
┌─────┐
│ 1 │ In memory
│ 4 │─ ─ ─ ─ ─ ─▶ sort/merge ─ ─ ─ ─ ─▶ total sorted output
│ 1 │
└─────┘ ▲
... │
┌─────┐ │
│ 4 │
│ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
└─────┘
in_mem_batches§When data does not fit in available memory
When memory is exhausted, data is first sorted and written to one or more spill files on disk:
┌─────┐ .─────────────────.
│ 2 │ ( )
│ 3 │ │`─────────────────'│
│ 1 │─ ─ ─ ─ ─ ─ ─ │ ┌────┐ │
│ 4 │ │ │ │ 1 │░ │
│ 2 │ │ │... │░ │
└─────┘ ▼ │ │ 4 │░ ┌ ─ ─ │
┌─────┐ │ └────┘░ 1 │░ │
│ 1 │ In memory │ ░░░░░░ │ ░░ │
│ 4 │─ ─ ▶ sort/merge ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
│ 1 │ and write to file │ │ ░░ │
└─────┘ │ 4 │░ │
... ▲ │ └░─░─░░ │
│ │ ░░░░░░ │
┌─────┐ │.─────────────────.│
│ 4 │ │ ( )
│ 3 │─ ─ ─ ─ ─ ─ ─ `─────────────────'
└─────┘
in_mem_batches spills
(file on disk in Arrow
IPC format)Once the input is completely read, the spill files are read and merged with any in memory batches to produce a single total sorted output:
.─────────────────.
( )
│`─────────────────'│
│ ┌────┐ │
│ │ 1 │░ │
│ │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
│ │ 4 │░ ┌────┐ │ │
│ └────┘░ │ 1 │░ │ ▼
│ ░░░░░░ │ │░ │
│ │... │─ ─│─ ─ ─ ▶ merge ─ ─ ─▶ total sorted output
│ │ │░ │
│ │ 4 │░ │ ▲
│ └────┘░ │ │
│ ░░░░░░ │
│.─────────────────.│ │
( )
`─────────────────' │
spills
│
│
┌─────┐ │
│ 1 │
│ 4 │─ ─ ─ ─ │
└─────┘ │
... In memory
└ ─ ─ ─▶ sort/merge
┌─────┐
│ 4 │ ▲
│ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
└─────┘
in_mem_batchesFields§
§schema: SchemaRefSchema of the output (and the input)
expr: LexOrderingSort expressions
batch_size: usizeThe target number of rows for output batches
sort_in_place_threshold_bytes: usizeIf the in size of buffered memory batches is below this size, the data will be concatenated and sorted in place rather than sort/merged.
in_mem_batches: Vec<RecordBatch>Unsorted input batches stored in the memory buffer
in_progress_spill_file: Option<(InProgressSpillFile, usize)>During external sorting, in-memory intermediate data will be appended to
this file incrementally. Once finished, this file will be moved to Self::finished_spill_files.
this is a tuple of:
InProgressSpillFile- the file that is being written tomax_record_batch_memory- the maximum memory usage of a single batch in this spill file.
finished_spill_files: Vec<SortedSpillFile>If data has previously been spilled, the locations of the spill files (in Arrow IPC format) Within the same spill file, the data might be chunked into multiple batches, and ordered by sort keys.
metrics: ExternalSorterMetricsRuntime metrics
runtime: Arc<RuntimeEnv>A handle to the runtime to get spill files
reservation: MemoryReservationReservation for in_mem_batches
spill_manager: SpillManager§merge_reservation: MemoryReservationReservation for the merging of in-memory batches. If the sort
might spill, sort_spill_reservation_bytes will be
pre-reserved to ensure there is some space for this sort/merge.
sort_spill_reservation_bytes: usizeHow much memory to reserve for performing in-memory sort/merges prior to spilling.
Implementations§
Source§impl ExternalSorter
impl ExternalSorter
pub fn new( partition_id: usize, schema: SchemaRef, expr: LexOrdering, batch_size: usize, sort_spill_reservation_bytes: usize, sort_in_place_threshold_bytes: usize, spill_compression: SpillCompression, metrics: &ExecutionPlanMetricsSet, runtime: Arc<RuntimeEnv>, ) -> Result<Self>
Sourceasync fn insert_batch(&mut self, input: RecordBatch) -> Result<()>
async fn insert_batch(&mut self, input: RecordBatch) -> Result<()>
Appends an unsorted [RecordBatch] to in_mem_batches
Updates memory usage metrics, and possibly triggers spilling to disk
fn spilled_before(&self) -> bool
Sourceasync fn sort(&mut self) -> Result<SendableRecordBatchStream>
async fn sort(&mut self) -> Result<SendableRecordBatchStream>
Returns the final sorted output of all batches inserted via
Self::insert_batch as a stream of [RecordBatch]es.
This process could either be:
-
An in-memory sort/merge (if the input fit in memory)
-
A combined streaming merge incorporating both in-memory batches and data from spill files on disk.
Sourcefn spilled_bytes(&self) -> usize
fn spilled_bytes(&self) -> usize
How many bytes have been spilled to disk?
Sourcefn spilled_rows(&self) -> usize
fn spilled_rows(&self) -> usize
How many rows have been spilled to disk?
Sourcefn spill_count(&self) -> usize
fn spill_count(&self) -> usize
How many spill files have been created?
Sourceasync fn consume_and_spill_append(
&mut self,
globally_sorted_batches: &mut Vec<RecordBatch>,
) -> Result<()>
async fn consume_and_spill_append( &mut self, globally_sorted_batches: &mut Vec<RecordBatch>, ) -> Result<()>
Appending globally sorted batches to the in-progress spill file, and clears
the globally_sorted_batches (also its memory reservation) afterwards.
Sourceasync fn spill_finish(&mut self) -> Result<()>
async fn spill_finish(&mut self) -> Result<()>
Finishes the in-progress spill file and moves it to the finished spill files.
Sourcefn organize_stringview_arrays(
globally_sorted_batches: &mut Vec<RecordBatch>,
) -> Result<()>
fn organize_stringview_arrays( globally_sorted_batches: &mut Vec<RecordBatch>, ) -> Result<()>
Reconstruct globally_sorted_batches to organize the payload buffers of each
StringViewArray in sequential order by calling gc() on them.
Note this is a workaround until https://github.com/apache/arrow-rs/issues/7185 is available
§Rationale
After (merge-based) sorting, all batches will be sorted into a single run,
but physically this sorted run is chunked into many small batches. For
StringViewArrays inside each sorted run, their inner buffers are not
re-constructed by default, leading to non-sequential payload locations
(permutated by interleave() Arrow kernel). A single payload buffer might
be shared by multiple RecordBatches.
When writing each batch to disk, the writer has to write all referenced buffers,
because they have to be read back one by one to reduce memory usage. This
causes extra disk reads and writes, and potentially execution failure.
§Example
Before sorting: batch1 -> buffer1 batch2 -> buffer2
sorted_batch1 -> buffer1 -> buffer2 sorted_batch2 -> buffer1 -> buffer2
Then when spilling each batch, the writer has to write all referenced buffers repeatedly.
Sourceasync fn sort_and_spill_in_mem_batches(&mut self) -> Result<()>
async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()>
Sorts the in-memory batches and merges them into a single sorted run, then writes the result to spill files.
Sourcefn in_mem_sort_stream(
&mut self,
metrics: BaselineMetrics,
) -> Result<SendableRecordBatchStream>
fn in_mem_sort_stream( &mut self, metrics: BaselineMetrics, ) -> Result<SendableRecordBatchStream>
Consumes in_mem_batches returning a sorted stream of batches. This proceeds in one of two ways:
§Small Datasets
For “smaller” datasets, the data is first concatenated into a single batch and then sorted. This is often faster than sorting and then merging.
┌─────┐
│ 2 │
│ 3 │
│ 1 │─ ─ ─ ─ ┐ ┌─────┐
│ 4 │ │ 2 │
│ 2 │ │ │ 3 │
└─────┘ │ 1 │ sorted output
┌─────┐ ▼ │ 4 │ stream
│ 1 │ │ 2 │
│ 4 │─ ─▶ concat ─ ─ ─ ─ ▶│ 1 │─ ─ ▶ sort ─ ─ ─ ─ ─▶
│ 1 │ │ 4 │
└─────┘ ▲ │ 1 │
... │ │ ... │
│ 4 │
┌─────┐ │ │ 3 │
│ 4 │ └─────┘
│ 3 │─ ─ ─ ─ ┘
└─────┘
in_mem_batches§Larger datasets
For larger datasets, the batches are first sorted individually and then merged together.
┌─────┐ ┌─────┐
│ 2 │ │ 1 │
│ 3 │ │ 2 │
│ 1 │─ ─▶ sort ─ ─▶│ 2 │─ ─ ─ ─ ─ ┐
│ 4 │ │ 3 │
│ 2 │ │ 4 │ │
└─────┘ └─────┘ sorted output
┌─────┐ ┌─────┐ ▼ stream
│ 1 │ │ 1 │
│ 4 │─ ▶ sort ─ ─ ▶│ 1 ├ ─ ─ ▶ merge ─ ─ ─ ─▶
│ 1 │ │ 4 │
└─────┘ └─────┘ ▲
... ... ... │
┌─────┐ ┌─────┐ │
│ 4 │ │ 3 │
│ 3 │─ ▶ sort ─ ─ ▶│ 4 │─ ─ ─ ─ ─ ┘
└─────┘ └─────┘
in_mem_batchesSourcefn sort_batch_stream(
&self,
batch: RecordBatch,
metrics: BaselineMetrics,
reservation: MemoryReservation,
split: bool,
) -> Result<SendableRecordBatchStream>
fn sort_batch_stream( &self, batch: RecordBatch, metrics: BaselineMetrics, reservation: MemoryReservation, split: bool, ) -> Result<SendableRecordBatchStream>
Sorts a single RecordBatch into a single stream.
reservation accounts for the memory used by this batch and
is released when the sort is complete
passing split true will return a BatchSplitStream where each batch maximum row count
will be self.batch_size.
If split is false, the stream will return a single batch
Sourcefn reserve_memory_for_merge(&mut self) -> Result<()>
fn reserve_memory_for_merge(&mut self) -> Result<()>
If this sort may spill, pre-allocates
sort_spill_reservation_bytes of memory to guarantee memory
left for the in memory sort/merge.
Sourceasync fn reserve_memory_for_batch_and_maybe_spill(
&mut self,
input: &RecordBatch,
) -> Result<()>
async fn reserve_memory_for_batch_and_maybe_spill( &mut self, input: &RecordBatch, ) -> Result<()>
Reserves memory to be able to accommodate the given batch. If memory is scarce, tries to spill current in-memory batches to disk first.
Sourcefn err_with_oom_context(e: DataFusionError) -> DataFusionError
fn err_with_oom_context(e: DataFusionError) -> DataFusionError
Wraps the error with a context message suggesting settings to tweak. This is meant to be used with DataFusionError::ResourcesExhausted only.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for ExternalSorter
impl !RefUnwindSafe for ExternalSorter
impl Send for ExternalSorter
impl Sync for ExternalSorter
impl Unpin for ExternalSorter
impl !UnwindSafe for ExternalSorter
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more