GroupedHashAggregateStream

Struct GroupedHashAggregateStream 

Source
pub(crate) struct GroupedHashAggregateStream {
Show 20 fields schema: SchemaRef, input: SendableRecordBatchStream, mode: AggregateMode, aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>, filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>, group_by: PhysicalGroupBy, batch_size: usize, group_values_soft_limit: Option<usize>, exec_state: ExecutionState, input_done: bool, group_values: Box<dyn GroupValues>, current_group_indices: Vec<usize>, accumulators: Vec<Box<dyn GroupsAccumulator>>, group_ordering: GroupOrdering, spill_state: SpillState, skip_aggregation_probe: Option<SkipAggregationProbe>, reservation: MemoryReservation, baseline_metrics: BaselineMetrics, group_by_metrics: GroupByMetrics, reduction_factor: Option<RatioMetrics>,
}
Expand description

HashTable based Grouping Aggregator

§Design Goals

This structure is designed so that updating the aggregates can be vectorized (done in a tight loop) without allocations. The accumulator state is not managed by this operator (e.g in the hash table) and instead is delegated to the individual accumulators which have type specialized inner loops that perform the aggregation.

§Architecture


    Assigns a consecutive group           internally stores aggregate values
    index for each unique set                     for all groups
        of group values

        ┌────────────┐              ┌──────────────┐       ┌──────────────┐
        │ ┌────────┐ │              │┌────────────┐│       │┌────────────┐│
        │ │  "A"   │ │              ││accumulator ││       ││accumulator ││
        │ ├────────┤ │              ││     0      ││       ││     N      ││
        │ │  "Z"   │ │              ││ ┌────────┐ ││       ││ ┌────────┐ ││
        │ └────────┘ │              ││ │ state  │ ││       ││ │ state  │ ││
        │            │              ││ │┌─────┐ │ ││  ...  ││ │┌─────┐ │ ││
        │    ...     │              ││ │├─────┤ │ ││       ││ │├─────┤ │ ││
        │            │              ││ │└─────┘ │ ││       ││ │└─────┘ │ ││
        │            │              ││ │        │ ││       ││ │        │ ││
        │ ┌────────┐ │              ││ │  ...   │ ││       ││ │  ...   │ ││
        │ │  "Q"   │ │              ││ │        │ ││       ││ │        │ ││
        │ └────────┘ │              ││ │┌─────┐ │ ││       ││ │┌─────┐ │ ││
        │            │              ││ │└─────┘ │ ││       ││ │└─────┘ │ ││
        └────────────┘              ││ └────────┘ ││       ││ └────────┘ ││
                                    │└────────────┘│       │└────────────┘│
                                    └──────────────┘       └──────────────┘

        group_values                             accumulators

For example, given a query like COUNT(x), SUM(y) ... GROUP BY z, group_values will store the distinct values of z. There will be one accumulator for COUNT(x), specialized for the data type of x and one accumulator for SUM(y), specialized for the data type of y.

§Discussion

group_values does not store any aggregate state inline. It only assigns “group indices”, one for each (distinct) group value. The accumulators manage the in-progress aggregate state for each group, with the group values themselves are stored in group_values at the corresponding group index.

The accumulator state (e.g partial sums) is managed by and stored by a [GroupsAccumulator] accumulator. There is one accumulator per aggregate expression (COUNT, AVG, etc) in the stream. Internally, each GroupsAccumulator manages the state for multiple groups, and is passed group_indexes during update. Note The accumulator state is not managed by this operator (e.g in the hash table).

§Partial Aggregate and multi-phase grouping

As described on Accumulator::state, this operator is used in the context “multi-phase” grouping when the mode is AggregateMode::Partial.

An important optimization for multi-phase partial aggregation is to skip partial aggregation when it is not effective enough to warrant the memory or CPU cost, as is often the case for queries many distinct groups (high cardinality group by). Memory is particularly important because each Partial aggregator must store the intermediate state for each group.

If the ratio of the number of groups to the number of input rows exceeds a threshold, and [GroupsAccumulator::supports_convert_to_state] is supported, this operator will stop applying Partial aggregation and directly pass the input rows to the next aggregation phase.

§Spilling (to disk)

The sizes of group values and accumulators can become large. Before that causes out of memory, this hash aggregator outputs partial states early for partial aggregation or spills to local disk using Arrow IPC format for final aggregation. For every input [RecordBatch], the memory manager checks whether the new input size meets the memory configuration. If not, outputting or spilling happens. For outputting, the final aggregation takes care of re-grouping. For spilling, later stream-merge sort on reading back the spilled data does re-grouping. Note the rows cannot be grouped once spilled onto disk, the read back data needs to be re-grouped again. In addition, re-grouping may cause out of memory again. Thus, re-grouping has to be a sort based aggregation.

Partial Aggregation [batch_size = 2] (max memory = 3 rows)

 INPUTS        PARTIALLY AGGREGATED (UPDATE BATCH)   OUTPUTS
┌─────────┐    ┌─────────────────┐                  ┌─────────────────┐
│ a │ b   │    │ a │    AVG(b)   │                  │ a │    AVG(b)   │
│---│-----│    │   │[count]│[sum]│                  │   │[count]│[sum]│
│ 3 │ 3.0 │ ─▶ │---│-------│-----│                  │---│-------│-----│
│ 2 │ 2.0 │    │ 2 │ 1     │ 2.0 │ ─▶ early emit ─▶ │ 2 │ 1     │ 2.0 │
└─────────┘    │ 3 │ 2     │ 7.0 │               │  │ 3 │ 2     │ 7.0 │
┌─────────┐ ─▶ │ 4 │ 1     │ 8.0 │               │  └─────────────────┘
│ 3 │ 4.0 │    └─────────────────┘               └▶ ┌─────────────────┐
│ 4 │ 8.0 │    ┌─────────────────┐                  │ 4 │ 1     │ 8.0 │
└─────────┘    │ a │    AVG(b)   │               ┌▶ │ 1 │ 1     │ 1.0 │
┌─────────┐    │---│-------│-----│               │  └─────────────────┘
│ 1 │ 1.0 │ ─▶ │ 1 │ 1     │ 1.0 │ ─▶ early emit ─▶ ┌─────────────────┐
│ 3 │ 2.0 │    │ 3 │ 1     │ 2.0 │                  │ 3 │ 1     │ 2.0 │
└─────────┘    └─────────────────┘                  └─────────────────┘


Final Aggregation [batch_size = 2] (max memory = 3 rows)

PARTIALLY INPUTS       FINAL AGGREGATION (MERGE BATCH)       RE-GROUPED (SORTED)
┌─────────────────┐    [keep using the partial schema]       [Real final aggregation
│ a │    AVG(b)   │    ┌─────────────────┐                    output]
│   │[count]│[sum]│    │ a │    AVG(b)   │                   ┌────────────┐
│---│-------│-----│ ─▶ │   │[count]│[sum]│                   │ a │ AVG(b) │
│ 3 │ 3     │ 3.0 │    │---│-------│-----│ ─▶ spill ─┐       │---│--------│
│ 2 │ 2     │ 1.0 │    │ 2 │ 2     │ 1.0 │           │       │ 1 │    4.0 │
└─────────────────┘    │ 3 │ 4     │ 8.0 │           ▼       │ 2 │    1.0 │
┌─────────────────┐ ─▶ │ 4 │ 1     │ 7.0 │     Streaming  ─▶ └────────────┘
│ 3 │ 1     │ 5.0 │    └─────────────────┘     merge sort ─▶ ┌────────────┐
│ 4 │ 1     │ 7.0 │    ┌─────────────────┐            ▲      │ a │ AVG(b) │
└─────────────────┘    │ a │    AVG(b)   │            │      │---│--------│
┌─────────────────┐    │---│-------│-----│ ─▶ memory ─┘      │ 3 │    2.0 │
│ 1 │ 2     │ 8.0 │ ─▶ │ 1 │ 2     │ 8.0 │                   │ 4 │    7.0 │
│ 2 │ 2     │ 3.0 │    │ 2 │ 2     │ 3.0 │                   └────────────┘
└─────────────────┘    └─────────────────┘

Fields§

§schema: SchemaRef§input: SendableRecordBatchStream§mode: AggregateMode§aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>

Arguments to pass to each accumulator.

The arguments in accumulator[i] is passed aggregate_arguments[i]

The argument to each accumulator is itself a Vec because some aggregates such as CORR can accept more than one argument.

§filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>

Optional filter expression to evaluate, one for each for accumulator. If present, only those rows for which the filter evaluate to true should be included in the aggregate results.

For example, for an aggregate like SUM(x) FILTER (WHERE x >= 100), the filter expression is x > 100.

§group_by: PhysicalGroupBy

GROUP BY expressions

§batch_size: usize

max rows in output RecordBatches

§group_values_soft_limit: Option<usize>

Optional soft limit on the number of group_values in a batch If the number of group_values in a single batch exceeds this value, the GroupedHashAggregateStream operation immediately switches to output mode and emits all groups.

§exec_state: ExecutionState

Tracks if this stream is generating input or output

§input_done: bool

Have we seen the end of the input

§group_values: Box<dyn GroupValues>

An interning store of group keys

§current_group_indices: Vec<usize>

scratch space for the current input [RecordBatch] being processed. Reused across batches here to avoid reallocations

§accumulators: Vec<Box<dyn GroupsAccumulator>>

Accumulators, one for each AggregateFunctionExpr in the query

For example, if the query has aggregates, SUM(x), COUNT(y), there will be two accumulators, each one specialized for that particular aggregate and its input types

§group_ordering: GroupOrdering

Optional ordering information, that might allow groups to be emitted from the hash table prior to seeing the end of the input

§spill_state: SpillState

The spill state object

§skip_aggregation_probe: Option<SkipAggregationProbe>

Optional probe for skipping data aggregation, if supported by current stream.

§reservation: MemoryReservation

The memory reservation for this grouping

§baseline_metrics: BaselineMetrics

Execution metrics

§group_by_metrics: GroupByMetrics

Aggregation-specific metrics

§reduction_factor: Option<RatioMetrics>

Reduction factor metric, calculated as output_rows/input_rows (only for partial aggregation)

Implementations§

Source§

impl GroupedHashAggregateStream

Source

pub fn new( agg: &AggregateExec, context: Arc<TaskContext>, partition: usize, ) -> Result<Self>

Create a new GroupedHashAggregateStream

Source§

impl GroupedHashAggregateStream

Source

fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()>

Perform group-by aggregation for the given [RecordBatch].

Source

fn update_memory_reservation(&mut self) -> Result<()>

Source

fn emit( &mut self, emit_to: EmitTo, spilling: bool, ) -> Result<Option<RecordBatch>>

Create an output RecordBatch with the group keys and accumulator states/values specified in emit_to

Source

fn spill_previous_if_necessary(&mut self, batch: &RecordBatch) -> Result<()>

Optimistically, Self::group_aggregate_batch allows to exceed the memory target slightly (~ 1 [RecordBatch]) for simplicity. In such cases, spill the data to disk and clear the memory. Currently only GroupOrdering::None is supported for spilling.

Source

fn spill(&mut self) -> Result<()>

Emit all intermediate aggregation states, sort them, and store them on disk. This process helps in reducing memory pressure by allowing the data to be read back with streaming merge.

Source

fn clear_shrink(&mut self, batch: &RecordBatch)

Clear memory and shirk capacities to the size of the batch.

Source

fn clear_all(&mut self)

Clear memory and shirk capacities to zero.

Source

fn emit_early_if_necessary(&mut self) -> Result<()>

Emit if the used memory exceeds the target for partial aggregation. Currently only GroupOrdering::None is supported for early emitting. TODO: support group_ordering for early emitting

Source

fn update_merged_stream(&mut self) -> Result<()>

At this point, all the inputs are read and there are some spills. Emit the remaining rows and create a batch. Conduct a streaming merge sort between the batch and spilled data. Since the stream is fully sorted, set self.group_ordering to Full, then later we can read with [EmitTo::First].

Source

fn hit_soft_group_limit(&self) -> bool

returns true if there is a soft groups limit and the number of distinct groups we have seen is over that limit

Source

fn set_input_done_and_produce_output(&mut self) -> Result<()>

common function for signalling end of processing of the input stream

Source

fn update_skip_aggregation_probe(&mut self, input_rows: usize)

Updates skip aggregation probe state.

Notice: It should only be called in Partial aggregation

Source

fn switch_to_skip_aggregation(&mut self) -> Result<()>

In case the probe indicates that aggregation may be skipped, forces stream to produce currently accumulated output.

Notice: It should only be called in Partial aggregation

Source

fn should_skip_aggregation(&self) -> bool

Returns true if the aggregation probe indicates that aggregation should be skipped.

Notice: It should only be called in Partial aggregation

Source

fn transform_to_states(&self, batch: RecordBatch) -> Result<RecordBatch>

Transforms input batch to intermediate aggregate state, without grouping it

Trait Implementations§

Source§

impl RecordBatchStream for GroupedHashAggregateStream

Source§

fn schema(&self) -> SchemaRef

Returns the schema of this RecordBatchStream. Read more
Source§

impl Stream for GroupedHashAggregateStream

Source§

type Item = Result<RecordBatch, DataFusionError>

Values yielded by the stream.
Source§

fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>

Attempt to pull out the next value of this stream, registering the current task for wakeup if the value is not yet available, and returning None if the stream is exhausted. Read more
§

fn size_hint(&self) -> (usize, Option<usize>)

Returns the bounds on the remaining length of the stream. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
§

impl<T> StreamExt for T
where T: Stream + ?Sized,

§

fn next(&mut self) -> Next<'_, Self>
where Self: Unpin,

Creates a future that resolves to the next item in the stream. Read more
§

fn into_future(self) -> StreamFuture<Self>
where Self: Sized + Unpin,

Converts this stream into a future of (next_item, tail_of_stream). If the stream terminates, then the next item is None. Read more
§

fn map<T, F>(self, f: F) -> Map<Self, F>
where F: FnMut(Self::Item) -> T, Self: Sized,

Maps this stream’s items to a different type, returning a new stream of the resulting type. Read more
§

fn enumerate(self) -> Enumerate<Self>
where Self: Sized,

Creates a stream which gives the current iteration count as well as the next value. Read more
§

fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,

Filters the values produced by this stream according to the provided asynchronous predicate. Read more
§

fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = Option<T>>, Self: Sized,

Filters the values produced by this stream while simultaneously mapping them to a different type according to the provided asynchronous closure. Read more
§

fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,

Computes from this stream’s items new items of a different type using an asynchronous closure. Read more
§

fn collect<C>(self) -> Collect<Self, C>
where C: Default + Extend<Self::Item>, Self: Sized,

Transforms a stream into a collection, returning a future representing the result of that computation. Read more
§

fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Sized + Stream<Item = (A, B)>,

Converts a stream of pairs into a future, which resolves to pair of containers. Read more
§

fn concat(self) -> Concat<Self>
where Self: Sized, Self::Item: Extend<<Self::Item as IntoIterator>::Item> + IntoIterator + Default,

Concatenate all items of a stream into a single extendable destination, returning a future representing the end result. Read more
§

fn count(self) -> Count<Self>
where Self: Sized,

Drives the stream to completion, counting the number of items. Read more
§

fn cycle(self) -> Cycle<Self>
where Self: Sized + Clone,

Repeats a stream endlessly. Read more
§

fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
where F: FnMut(T, Self::Item) -> Fut, Fut: Future<Output = T>, Self: Sized,

Execute an accumulating asynchronous computation over a stream, collecting all the values into one final result. Read more
§

fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,

Execute predicate over asynchronous stream, and return true if any element in stream satisfied a predicate. Read more
§

fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,

Execute predicate over asynchronous stream, and return true if all element in stream satisfied a predicate. Read more
§

fn flatten(self) -> Flatten<Self>
where Self::Item: Stream, Self: Sized,

Flattens a stream of streams into just one continuous stream. Read more
§

fn flatten_unordered( self, limit: impl Into<Option<usize>>, ) -> FlattenUnorderedWithFlowController<Self, ()>
where Self::Item: Stream + Unpin, Self: Sized,

Flattens a stream of streams into just one continuous stream. Polls inner streams produced by the base stream concurrently. Read more
§

fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
where F: FnMut(Self::Item) -> U, U: Stream, Self: Sized,

Maps a stream like [StreamExt::map] but flattens nested Streams. Read more
§

fn flat_map_unordered<U, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> FlatMapUnordered<Self, U, F>
where U: Stream + Unpin, F: FnMut(Self::Item) -> U, Self: Sized,

Maps a stream like [StreamExt::map] but flattens nested Streams and polls them concurrently, yielding items in any order, as they made available. Read more
§

fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
where F: FnMut(&mut S, Self::Item) -> Fut, Fut: Future<Output = Option<B>>, Self: Sized,

Combinator similar to [StreamExt::fold] that holds internal state and produces a new stream. Read more
§

fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,

Skip elements on this stream while the provided asynchronous predicate resolves to true. Read more
§

fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,

Take elements from this stream while the provided asynchronous predicate resolves to true. Read more
§

fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
where Fut: Future, Self: Sized,

Take elements from this stream until the provided future resolves. Read more
§

fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,

Runs this stream to completion, executing the provided asynchronous closure for each element on the stream. Read more
§

fn for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F>
where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,

Runs this stream to completion, executing the provided asynchronous closure for each element on the stream concurrently as elements become available. Read more
§

fn take(self, n: usize) -> Take<Self>
where Self: Sized,

Creates a new stream of at most n items of the underlying stream. Read more
§

fn skip(self, n: usize) -> Skip<Self>
where Self: Sized,

Creates a new stream which skips n items of the underlying stream. Read more
§

fn fuse(self) -> Fuse<Self>
where Self: Sized,

Fuse a stream such that poll_next will never again be called once it has finished. This method can be used to turn any Stream into a FusedStream. Read more
§

fn by_ref(&mut self) -> &mut Self

Borrows a stream, rather than consuming it. Read more
§

fn catch_unwind(self) -> CatchUnwind<Self>
where Self: Sized + UnwindSafe,

Catches unwinding panics while polling the stream. Read more
§

fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
where Self: Sized + Send + 'a,

Wrap the stream in a Box, pinning it. Read more
§

fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
where Self: Sized + 'a,

Wrap the stream in a Box, pinning it. Read more
§

fn buffered(self, n: usize) -> Buffered<Self>
where Self::Item: Future, Self: Sized,

An adaptor for creating a buffered list of pending futures. Read more
§

fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
where Self::Item: Future, Self: Sized,

An adaptor for creating a buffered list of pending futures (unordered). Read more
§

fn zip<St>(self, other: St) -> Zip<Self, St>
where St: Stream, Self: Sized,

An adapter for zipping two streams together. Read more
§

fn chain<St>(self, other: St) -> Chain<Self, St>
where St: Stream<Item = Self::Item>, Self: Sized,

Adapter for chaining two streams. Read more
§

fn peekable(self) -> Peekable<Self>
where Self: Sized,

Creates a new stream which exposes a peek method. Read more
§

fn chunks(self, capacity: usize) -> Chunks<Self>
where Self: Sized,

An adaptor for chunking up items of the stream inside a vector. Read more
§

fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
where Self: Sized,

An adaptor for chunking up ready items of the stream inside a vector. Read more
§

fn forward<S>(self, sink: S) -> Forward<Self, S>
where S: Sink<Self::Ok, Error = Self::Error>, Self: Sized + TryStream,

A future that completes after the given stream has been fully processed into the sink and the sink has been flushed and closed. Read more
§

fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
where Self: Sized + Sink<Item>,

Splits this Stream + Sink object into separate Sink and Stream objects. Read more
§

fn inspect<F>(self, f: F) -> Inspect<Self, F>
where F: FnMut(&Self::Item), Self: Sized,

Do something with each item of this stream, afterwards passing it on. Read more
§

fn left_stream<B>(self) -> Either<Self, B>
where B: Stream<Item = Self::Item>, Self: Sized,

Wrap this stream in an Either stream, making it the left-hand variant of that Either. Read more
§

fn right_stream<B>(self) -> Either<B, Self>
where B: Stream<Item = Self::Item>, Self: Sized,

Wrap this stream in an Either stream, making it the right-hand variant of that Either. Read more
§

fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
where Self: Unpin,

A convenience method for calling [Stream::poll_next] on Unpin stream types.
§

fn select_next_some(&mut self) -> SelectNextSome<'_, Self>
where Self: Unpin + FusedStream,

Returns a Future that resolves when the next item in this stream is ready. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<S, T, E> TryStream for S
where S: Stream<Item = Result<T, E>> + ?Sized,

§

type Ok = T

The type of successful values yielded by this future
§

type Error = E

The type of failures yielded by this future
§

fn try_poll_next( self: Pin<&mut S>, cx: &mut Context<'_>, ) -> Poll<Option<Result<<S as TryStream>::Ok, <S as TryStream>::Error>>>

Poll this TryStream as if it were a Stream. Read more
§

impl<S> TryStreamExt for S
where S: TryStream + ?Sized,

§

fn err_into<E>(self) -> ErrInto<Self, E>
where Self: Sized, Self::Error: Into<E>,

Wraps the current stream in a new stream which converts the error type into the one provided. Read more
§

fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
where Self: Sized, F: FnMut(Self::Ok) -> T,

Wraps the current stream in a new stream which maps the success value using the provided closure. Read more
§

fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
where Self: Sized, F: FnMut(Self::Error) -> E,

Wraps the current stream in a new stream which maps the error value using the provided closure. Read more
§

fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Error = Self::Error>, Self: Sized,

Chain on a computation for when a value is ready, passing the successful results to the provided closure f. Read more
§

fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
where F: FnMut(Self::Error) -> Fut, Fut: TryFuture<Ok = Self::Ok>, Self: Sized,

Chain on a computation for when an error happens, passing the erroneous result to the provided closure f. Read more
§

fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
where F: FnMut(&Self::Ok), Self: Sized,

Do something with the success value of this stream, afterwards passing it on. Read more
§

fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
where F: FnMut(&Self::Error), Self: Sized,

Do something with the error value of this stream, afterwards passing it on. Read more
§

fn into_stream(self) -> IntoStream<Self>
where Self: Sized,

Wraps a [TryStream] into a type that implements Stream Read more
§

fn try_next(&mut self) -> TryNext<'_, Self>
where Self: Unpin,

Creates a future that attempts to resolve the next item in the stream. If an error is encountered before the next item, the error is returned instead. Read more
§

fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Ok = (), Error = Self::Error>, Self: Sized,

Attempts to run this stream to completion, executing the provided asynchronous closure for each element on the stream. Read more
§

fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,

Skip elements on this stream while the provided asynchronous predicate resolves to true. Read more
§

fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,

Take elements on this stream while the provided asynchronous predicate resolves to true. Read more
§

fn try_for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> TryForEachConcurrent<Self, Fut, F>
where F: FnMut(Self::Ok) -> Fut, Fut: Future<Output = Result<(), Self::Error>>, Self: Sized,

Attempts to run this stream to completion, executing the provided asynchronous closure for each element on the stream concurrently as elements become available, exiting as soon as an error occurs. Read more
§

fn try_collect<C>(self) -> TryCollect<Self, C>
where C: Default + Extend<Self::Ok>, Self: Sized,

Attempt to transform a stream into a collection, returning a future representing the result of that computation. Read more
§

fn try_chunks(self, capacity: usize) -> TryChunks<Self>
where Self: Sized,

An adaptor for chunking up successful items of the stream inside a vector. Read more
§

fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self>
where Self: Sized,

An adaptor for chunking up successful, ready items of the stream inside a vector. Read more
§

fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
where Fut: Future<Output = bool>, F: FnMut(&Self::Ok) -> Fut, Self: Sized,

Attempt to filter the values produced by this stream according to the provided asynchronous closure. Read more
§

fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
where Fut: TryFuture<Ok = Option<T>, Error = Self::Error>, F: FnMut(Self::Ok) -> Fut, Self: Sized,

Attempt to filter the values produced by this stream while simultaneously mapping them to a different type according to the provided asynchronous closure. Read more
§

fn try_flatten_unordered( self, limit: impl Into<Option<usize>>, ) -> TryFlattenUnordered<Self>
where Self::Ok: TryStream + Unpin, <Self::Ok as TryStream>::Error: From<Self::Error>, Self: Sized,

Flattens a stream of streams into just one continuous stream. Produced streams will be polled concurrently and any errors will be passed through without looking at them. If the underlying base stream returns an error, it will be immediately propagated. Read more
§

fn try_flatten(self) -> TryFlatten<Self>
where Self::Ok: TryStream, <Self::Ok as TryStream>::Error: From<Self::Error>, Self: Sized,

Flattens a stream of streams into just one continuous stream. Read more
§

fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
where F: FnMut(T, Self::Ok) -> Fut, Fut: TryFuture<Ok = T, Error = Self::Error>, Self: Sized,

Attempt to execute an accumulating asynchronous computation over a stream, collecting all the values into one final result. Read more
§

fn try_concat(self) -> TryConcat<Self>
where Self: Sized, Self::Ok: Extend<<Self::Ok as IntoIterator>::Item> + IntoIterator + Default,

Attempt to concatenate all items of a stream into a single extendable destination, returning a future representing the end result. Read more
§

fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>
where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized,

Attempt to execute several futures from a stream concurrently (unordered). Read more
§

fn try_buffered(self, n: usize) -> TryBuffered<Self>
where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized,

Attempt to execute several futures from a stream concurrently. Read more
§

fn try_poll_next_unpin( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
where Self: Unpin,

A convenience method for calling [TryStream::try_poll_next] on Unpin stream types.
§

fn into_async_read(self) -> IntoAsyncRead<Self>
where Self: Sized + TryStreamExt<Error = Error>, Self::Ok: AsRef<[u8]>,

Adapter that converts this stream into an AsyncBufRead. Read more
§

fn try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F>
where Self: Sized, F: FnMut(Self::Ok) -> Fut, Fut: Future<Output = bool>,

Attempt to execute a predicate over an asynchronous stream and evaluate if all items satisfy the predicate. Exits early if an Err is encountered or if an Ok item is found that does not satisfy the predicate. Read more
§

fn try_any<Fut, F>(self, f: F) -> TryAny<Self, Fut, F>
where Self: Sized, F: FnMut(Self::Ok) -> Fut, Fut: Future<Output = bool>,

Attempt to execute a predicate over an asynchronous stream and evaluate if any items satisfy the predicate. Exits early if an Err is encountered or if an Ok item is found that satisfies the predicate. Read more
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> ErasedDestructor for T
where T: 'static,