Expand description
Utilities for improved cooperative scheduling.
§Cooperative scheduling
A single call to poll_next on a top-level [Stream] may potentially perform a lot of work
before it returns a Poll::Pending. Think for instance of calculating an aggregation over a
large dataset.
If a Stream runs for a long period of time without yielding back to the Tokio executor,
it can starve other tasks waiting on that executor to execute them.
Additionally, this prevents the query execution from being cancelled.
To ensure that Stream implementations yield regularly, operators can insert explicit yield
points using the utilities in this module. For most operators this is not necessary. The
Streams of the built-in DataFusion operators that generate (rather than manipulate)
RecordBatches such as DataSourceExec and those that eagerly consume RecordBatches
(for instance, RepartitionExec) contain yield points that will make most query Streams yield
periodically.
There are a couple of types of operators that should insert yield points:
- New source operators that do not make use of Tokio resources
- Exchange like operators that do not use Tokio’s
Channelimplementation to pass data between tasks
§Adding yield points
Yield points can be inserted manually using the facilities provided by the
Tokio coop module such as
tokio::task::coop::consume_budget.
Another option is to use the wrapper Stream implementation provided by this module which will
consume a unit of task budget every time a RecordBatch is produced.
Wrapper Streams can be created using the cooperative and make_cooperative functions.
cooperative is a generic function that takes ownership of the wrapped RecordBatchStream.
This function has the benefit of not requiring an additional heap allocation and can avoid
dynamic dispatch.
make_cooperative is a non-generic function that wraps a SendableRecordBatchStream. This
can be used to wrap dynamically typed, heap allocated RecordBatchStreams.
§Automatic cooperation
The EnsureCooperative physical optimizer rule, which is included in the default set of
optimizer rules, inspects query plans for potential cooperative scheduling issues.
It injects the CooperativeExec wrapper ExecutionPlan into the query plan where necessary.
This ExecutionPlan uses make_cooperative to wrap the Stream of its input.
The optimizer rule currently checks the plan for exchange-like operators and leave operators
that report SchedulingType::NonCooperative in their plan properties.
Structs§
- Cooperative
Exec - An execution plan decorator that enables cooperative multitasking.
It wraps the streams produced by its input execution plan using the
make_cooperativefunction, which makes the stream participate in Tokio cooperative scheduling. - Cooperative
Stream - A stream that passes record batches through unchanged while cooperating with the Tokio runtime.
It consumes cooperative scheduling budget for each returned
RecordBatch, allowing other tasks to execute when the budget is exhausted.
Functions§
- cooperative
- Creates a
CooperativeStreamwrapper around the givenRecordBatchStream. This wrapper collaborates with the Tokio cooperative scheduler by consuming a unit of scheduling budget for each returned record batch. - make_
cooperative - Wraps a
SendableRecordBatchStreaminside aCooperativeStreamto enable cooperative multitasking. SinceSendableRecordBatchStreamis adyn RecordBatchStreamthis requires the use of dynamic method dispatch. When the stream type is statically known, consider use the genericcooperativefunction to allow static method dispatch.