pub(super) struct SortMergeJoinStream {Show 28 fields
pub schema: SchemaRef,
pub null_equality: NullEquality,
pub sort_options: Vec<SortOptions>,
pub filter: Option<JoinFilter>,
pub join_type: JoinType,
pub batch_size: usize,
pub streamed_schema: SchemaRef,
pub streamed: SendableRecordBatchStream,
pub streamed_batch: StreamedBatch,
pub streamed_joined: bool,
pub streamed_state: StreamedState,
pub on_streamed: Vec<PhysicalExprRef>,
pub buffered_schema: SchemaRef,
pub buffered: SendableRecordBatchStream,
pub buffered_data: BufferedData,
pub buffered_joined: bool,
pub buffered_state: BufferedState,
pub on_buffered: Vec<PhysicalExprRef>,
pub state: SortMergeJoinState,
pub staging_output_record_batches: JoinedRecordBatches,
pub output: RecordBatch,
pub output_size: usize,
pub current_ordering: Ordering,
pub spill_manager: SpillManager,
pub join_metrics: SortMergeJoinMetrics,
pub reservation: MemoryReservation,
pub runtime_env: Arc<RuntimeEnv>,
pub streamed_batch_counter: AtomicUsize,
}Expand description
Sort-Merge join stream that consumes streamed and buffered data streams and produces joined output stream.
Fields§
§schema: SchemaRefOutput schema
null_equality: NullEqualityDefines the null equality for the join.
sort_options: Vec<SortOptions>Sort options of join columns used to sort streamed and buffered data stream
filter: Option<JoinFilter>optional join filter
join_type: JoinTypeHow the join is performed
batch_size: usizeTarget output batch size
streamed_schema: SchemaRefInput schema of streamed
streamed: SendableRecordBatchStreamStreamed data stream
streamed_batch: StreamedBatchCurrent processing record batch of streamed
streamed_joined: bool(used in outer join) Is current streamed row joined at least once?
streamed_state: StreamedStateState of streamed
on_streamed: Vec<PhysicalExprRef>Join key columns of streamed
buffered_schema: SchemaRefInput schema of buffered
buffered: SendableRecordBatchStreamBuffered data stream
buffered_data: BufferedDataCurrent buffered data
buffered_joined: bool(used in outer join) Is current buffered batches joined at least once?
buffered_state: BufferedStateState of buffered
on_buffered: Vec<PhysicalExprRef>Join key columns of buffered
state: SortMergeJoinStateCurrent state of the stream
staging_output_record_batches: JoinedRecordBatchesStaging output array builders
output: RecordBatchOutput buffer. Currently used by filtering as it requires double buffering
to avoid small/empty batches. Non-filtered join outputs directly from staging_output_record_batches.batches
output_size: usizeStaging output size, including output batches and staging joined results. Increased when we put rows into buffer and decreased after we actually output batches. Used to trigger output when sufficient rows are ready
current_ordering: OrderingThe comparison result of current streamed row and buffered batches
spill_manager: SpillManagerManages the process of spilling and reading back intermediate data
join_metrics: SortMergeJoinMetricsMetrics
reservation: MemoryReservationMemory reservation
runtime_env: Arc<RuntimeEnv>Runtime env
streamed_batch_counter: AtomicUsizeA unique number for each batch
Implementations§
Source§impl SortMergeJoinStream
impl SortMergeJoinStream
pub fn try_new( spill_compression: SpillCompression, schema: SchemaRef, sort_options: Vec<SortOptions>, null_equality: NullEquality, streamed: SendableRecordBatchStream, buffered: SendableRecordBatchStream, on_streamed: Vec<Arc<dyn PhysicalExpr>>, on_buffered: Vec<Arc<dyn PhysicalExpr>>, filter: Option<JoinFilter>, join_type: JoinType, batch_size: usize, join_metrics: SortMergeJoinMetrics, reservation: MemoryReservation, runtime_env: Arc<RuntimeEnv>, ) -> Result<Self>
Sourcefn poll_streamed_row(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<()>>>
fn poll_streamed_row( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<()>>>
Poll next streamed row
fn free_reservation(&mut self, buffered_batch: BufferedBatch) -> Result<()>
fn allocate_reservation(&mut self, buffered_batch: BufferedBatch) -> Result<()>
Sourcefn poll_buffered_batches(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<()>>>
fn poll_buffered_batches( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<()>>>
Poll next buffered batches
Sourcefn compare_streamed_buffered(&self) -> Result<Ordering>
fn compare_streamed_buffered(&self) -> Result<Ordering>
Get comparison result of streamed row and buffered batches
Sourcefn join_partial(&mut self) -> Result<()>
fn join_partial(&mut self) -> Result<()>
Produce join and fill output buffer until reaching target batch size or the join is finished
fn freeze_all(&mut self) -> Result<()>
fn freeze_dequeuing_buffered(&mut self) -> Result<()>
fn freeze_buffered(&mut self, batch_count: usize) -> Result<()>
fn produce_buffered_not_matched( &mut self, buffered_batch: &mut BufferedBatch, ) -> Result<()>
fn freeze_streamed(&mut self) -> Result<()>
fn output_record_batch_and_reset(&mut self) -> Result<RecordBatch>
fn filter_joined_batch(&mut self) -> Result<RecordBatch>
fn filter_record_batch_by_join_type( &mut self, record_batch: RecordBatch, corrected_mask: &BooleanArray, ) -> Result<RecordBatch>
Trait Implementations§
Source§impl Stream for SortMergeJoinStream
impl Stream for SortMergeJoinStream
Source§type Item = Result<RecordBatch, DataFusionError>
type Item = Result<RecordBatch, DataFusionError>
Auto Trait Implementations§
impl !Freeze for SortMergeJoinStream
impl !RefUnwindSafe for SortMergeJoinStream
impl Send for SortMergeJoinStream
impl !Sync for SortMergeJoinStream
impl Unpin for SortMergeJoinStream
impl !UnwindSafe for SortMergeJoinStream
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
§impl<T> StreamExt for Twhere
T: Stream + ?Sized,
impl<T> StreamExt for Twhere
T: Stream + ?Sized,
§fn next(&mut self) -> Next<'_, Self>where
Self: Unpin,
fn next(&mut self) -> Next<'_, Self>where
Self: Unpin,
§fn into_future(self) -> StreamFuture<Self>
fn into_future(self) -> StreamFuture<Self>
§fn map<T, F>(self, f: F) -> Map<Self, F>
fn map<T, F>(self, f: F) -> Map<Self, F>
§fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
§fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
§fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
§fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
§fn collect<C>(self) -> Collect<Self, C>
fn collect<C>(self) -> Collect<Self, C>
§fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
§fn concat(self) -> Concat<Self>
fn concat(self) -> Concat<Self>
§fn count(self) -> Count<Self>where
Self: Sized,
fn count(self) -> Count<Self>where
Self: Sized,
§fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
§fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
true if any element in stream satisfied a predicate. Read more§fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
true if all element in stream satisfied a predicate. Read more§fn flatten(self) -> Flatten<Self>where
Self::Item: Stream,
Self: Sized,
fn flatten(self) -> Flatten<Self>where
Self::Item: Stream,
Self: Sized,
§fn flatten_unordered(
self,
limit: impl Into<Option<usize>>,
) -> FlattenUnorderedWithFlowController<Self, ()>
fn flatten_unordered( self, limit: impl Into<Option<usize>>, ) -> FlattenUnorderedWithFlowController<Self, ()>
§fn flat_map_unordered<U, F>(
self,
limit: impl Into<Option<usize>>,
f: F,
) -> FlatMapUnordered<Self, U, F>
fn flat_map_unordered<U, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> FlatMapUnordered<Self, U, F>
StreamExt::map] but flattens nested Streams
and polls them concurrently, yielding items in any order, as they made
available. Read more§fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
StreamExt::fold] that holds internal state
and produces a new stream. Read more§fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
true. Read more§fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
true. Read more§fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
§fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
§fn for_each_concurrent<Fut, F>(
self,
limit: impl Into<Option<usize>>,
f: F,
) -> ForEachConcurrent<Self, Fut, F>
fn for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F>
§fn take(self, n: usize) -> Take<Self>where
Self: Sized,
fn take(self, n: usize) -> Take<Self>where
Self: Sized,
n items of the underlying stream. Read more§fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
n items of the underlying stream. Read more§fn catch_unwind(self) -> CatchUnwind<Self>where
Self: Sized + UnwindSafe,
fn catch_unwind(self) -> CatchUnwind<Self>where
Self: Sized + UnwindSafe,
§fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
§fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>where
Self: Sized + 'a,
fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>where
Self: Sized + 'a,
§fn buffered(self, n: usize) -> Buffered<Self>
fn buffered(self, n: usize) -> Buffered<Self>
§fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
§fn zip<St>(self, other: St) -> Zip<Self, St>where
St: Stream,
Self: Sized,
fn zip<St>(self, other: St) -> Zip<Self, St>where
St: Stream,
Self: Sized,
§fn chain<St>(self, other: St) -> Chain<Self, St>where
St: Stream<Item = Self::Item>,
Self: Sized,
fn chain<St>(self, other: St) -> Chain<Self, St>where
St: Stream<Item = Self::Item>,
Self: Sized,
§fn peekable(self) -> Peekable<Self>where
Self: Sized,
fn peekable(self) -> Peekable<Self>where
Self: Sized,
peek method. Read more§fn chunks(self, capacity: usize) -> Chunks<Self>where
Self: Sized,
fn chunks(self, capacity: usize) -> Chunks<Self>where
Self: Sized,
§fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>where
Self: Sized,
fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>where
Self: Sized,
§fn forward<S>(self, sink: S) -> Forward<Self, S>where
S: Sink<Self::Ok, Error = Self::Error>,
Self: Sized + TryStream,
fn forward<S>(self, sink: S) -> Forward<Self, S>where
S: Sink<Self::Ok, Error = Self::Error>,
Self: Sized + TryStream,
§fn inspect<F>(self, f: F) -> Inspect<Self, F>
fn inspect<F>(self, f: F) -> Inspect<Self, F>
§fn left_stream<B>(self) -> Either<Self, B>where
B: Stream<Item = Self::Item>,
Self: Sized,
fn left_stream<B>(self) -> Either<Self, B>where
B: Stream<Item = Self::Item>,
Self: Sized,
§fn right_stream<B>(self) -> Either<B, Self>where
B: Stream<Item = Self::Item>,
Self: Sized,
fn right_stream<B>(self) -> Either<B, Self>where
B: Stream<Item = Self::Item>,
Self: Sized,
§fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>where
Self: Unpin,
fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>where
Self: Unpin,
Stream::poll_next] on Unpin
stream types.§fn select_next_some(&mut self) -> SelectNextSome<'_, Self>where
Self: Unpin + FusedStream,
fn select_next_some(&mut self) -> SelectNextSome<'_, Self>where
Self: Unpin + FusedStream,
§impl<S, T, E> TryStream for S
impl<S, T, E> TryStream for S
§impl<S> TryStreamExt for Swhere
S: TryStream + ?Sized,
impl<S> TryStreamExt for Swhere
S: TryStream + ?Sized,
§fn err_into<E>(self) -> ErrInto<Self, E>
fn err_into<E>(self) -> ErrInto<Self, E>
§fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
§fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
§fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
f. Read more§fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
f. Read more§fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
§fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
§fn into_stream(self) -> IntoStream<Self>where
Self: Sized,
fn into_stream(self) -> IntoStream<Self>where
Self: Sized,
§fn try_next(&mut self) -> TryNext<'_, Self>where
Self: Unpin,
fn try_next(&mut self) -> TryNext<'_, Self>where
Self: Unpin,
§fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
§fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
true. Read more§fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
true. Read more§fn try_for_each_concurrent<Fut, F>(
self,
limit: impl Into<Option<usize>>,
f: F,
) -> TryForEachConcurrent<Self, Fut, F>
fn try_for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> TryForEachConcurrent<Self, Fut, F>
§fn try_collect<C>(self) -> TryCollect<Self, C>
fn try_collect<C>(self) -> TryCollect<Self, C>
§fn try_chunks(self, capacity: usize) -> TryChunks<Self>where
Self: Sized,
fn try_chunks(self, capacity: usize) -> TryChunks<Self>where
Self: Sized,
§fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self>where
Self: Sized,
fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self>where
Self: Sized,
§fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
§fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
§fn try_flatten_unordered(
self,
limit: impl Into<Option<usize>>,
) -> TryFlattenUnordered<Self>
fn try_flatten_unordered( self, limit: impl Into<Option<usize>>, ) -> TryFlattenUnordered<Self>
§fn try_flatten(self) -> TryFlatten<Self>
fn try_flatten(self) -> TryFlatten<Self>
§fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
§fn try_concat(self) -> TryConcat<Self>
fn try_concat(self) -> TryConcat<Self>
§fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>where
Self::Ok: TryFuture<Error = Self::Error>,
Self: Sized,
fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>where
Self::Ok: TryFuture<Error = Self::Error>,
Self: Sized,
§fn try_buffered(self, n: usize) -> TryBuffered<Self>where
Self::Ok: TryFuture<Error = Self::Error>,
Self: Sized,
fn try_buffered(self, n: usize) -> TryBuffered<Self>where
Self::Ok: TryFuture<Error = Self::Error>,
Self: Sized,
§fn try_poll_next_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Ok, Self::Error>>>where
Self: Unpin,
fn try_poll_next_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Ok, Self::Error>>>where
Self: Unpin,
TryStream::try_poll_next] on Unpin
stream types.§fn into_async_read(self) -> IntoAsyncRead<Self>
fn into_async_read(self) -> IntoAsyncRead<Self>
AsyncBufRead. Read more§fn try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F>
fn try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F>
Err is encountered or if an Ok item is found
that does not satisfy the predicate. Read more