TopK

Struct TopK 

Source
pub struct TopK {
    schema: SchemaRef,
    metrics: TopKMetrics,
    reservation: MemoryReservation,
    batch_size: usize,
    expr: LexOrdering,
    row_converter: RowConverter,
    scratch_rows: Rows,
    heap: TopKHeap,
    common_sort_prefix_converter: Option<RowConverter>,
    common_sort_prefix: Arc<[PhysicalSortExpr]>,
    filter: Arc<RwLock<TopKDynamicFilters>>,
    pub(crate) finished: bool,
}
Expand description

Global TopK

§Background

“Top K” is a common query optimization used for queries such as “find the top 3 customers by revenue”. The (simplified) SQL for such a query might be:

SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;

The simple plan would be:

> explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+--------------+----------------------------------------+
| plan_type    | plan                                   |
+--------------+----------------------------------------+
| logical_plan | Limit: 3                               |
|              |   Sort: revenue DESC NULLS FIRST       |
|              |     Projection: customer_id, revenue   |
|              |       TableScan: sales                 |
+--------------+----------------------------------------+

While this plan produces the correct answer, it will fully sorts the input before discarding everything other than the top 3 elements.

The same answer can be produced by simply keeping track of the top K=3 elements, reducing the total amount of required buffer memory.

§Partial Sort Optimization

This implementation additionally optimizes queries where the input is already partially sorted by a common prefix of the requested ordering. Once the top K heap is full, if subsequent rows are guaranteed to be strictly greater (in sort order) on this prefix than the largest row currently stored, the operator safely terminates early.

§Example

For input sorted by (day DESC), but not by timestamp, a query such as:

SELECT day, timestamp FROM sensor ORDER BY day DESC, timestamp DESC LIMIT 10;

can terminate scanning early once sufficient rows from the latest days have been collected, skipping older data.

§Structure

This operator tracks the top K items using a TopKHeap.

Fields§

§schema: SchemaRef

schema of the output (and the input)

§metrics: TopKMetrics

Runtime metrics

§reservation: MemoryReservation

Reservation

§batch_size: usize

The target number of rows for output batches

§expr: LexOrdering

sort expressions

§row_converter: RowConverter

row converter, for sort keys

§scratch_rows: Rows

scratch space for converting rows

§heap: TopKHeap

stores the top k values and their sort key values, in order

§common_sort_prefix_converter: Option<RowConverter>

row converter, for common keys between the sort keys and the input ordering

§common_sort_prefix: Arc<[PhysicalSortExpr]>

Common sort prefix between the input and the sort expressions to allow early exit optimization

§filter: Arc<RwLock<TopKDynamicFilters>>

Filter matching the state of the TopK heap used for dynamic filter pushdown

§finished: bool

If true, indicates that all rows of subsequent batches are guaranteed to be greater (by byte order, after row conversion) than the top K, which means the top K won’t change and the computation can be finished early.

Implementations§

Source§

impl TopK

Source

pub fn try_new( partition_id: usize, schema: SchemaRef, common_sort_prefix: Vec<PhysicalSortExpr>, expr: LexOrdering, k: usize, batch_size: usize, runtime: Arc<RuntimeEnv>, metrics: &ExecutionPlanMetricsSet, filter: Arc<RwLock<TopKDynamicFilters>>, ) -> Result<Self>

Create a new TopK that stores the top k values, as defined by the sort expressions in expr.

Source

pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>

Insert batch, remembering if any of its values are among the top k seen so far.

Source

fn find_new_topk_items( &mut self, items: impl Iterator<Item = usize>, batch_entry: &mut RecordBatchEntry, ) -> usize

Source

fn update_filter(&mut self) -> Result<()>

Update the filter representation of our TopK heap. For example, given the sort expression ORDER BY a DESC, b ASC LIMIT 3, and the current heap values [(1, 5), (1, 4), (2, 3)], the filter will be updated to:

(a > 1 OR (a = 1 AND b < 5)) AND
(a > 1 OR (a = 1 AND b < 4)) AND
(a > 2 OR (a = 2 AND b < 3))
Source

fn build_filter_expression( sort_exprs: &[PhysicalSortExpr], thresholds: Vec<ScalarValue>, ) -> Result<Option<Arc<dyn PhysicalExpr>>>

Build the filter expression with the given thresholds. This is now called outside of any locks to reduce critical section time.

Source

fn attempt_early_completion(&mut self, batch: &RecordBatch) -> Result<()>

If input ordering shares a common sort prefix with the TopK, and if the TopK’s heap is full, check if the computation can be finished early. This is the case if the last row of the current batch is strictly greater than the max row in the heap, comparing only on the shared prefix columns.

Source

fn compute_common_sort_prefix( &self, batch: &RecordBatch, last_row_idx: usize, scratch: &mut Rows, ) -> Result<()>

Source

pub fn emit(self) -> Result<SendableRecordBatchStream>

Returns the top k results broken into batch_size [RecordBatch]es, consuming the heap

Source

fn size(&self) -> usize

return the size of memory used by this operator, in bytes

Auto Trait Implementations§

§

impl Freeze for TopK

§

impl !RefUnwindSafe for TopK

§

impl Send for TopK

§

impl Sync for TopK

§

impl Unpin for TopK

§

impl !UnwindSafe for TopK

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> ErasedDestructor for T
where T: 'static,