pub fn parallelize_sorts(
requirements: PlanContext<bool>,
) -> Result<Transformed<PlanContext<bool>>, DataFusionError>Expand description
Transform CoalescePartitionsExec + SortExec cascades into SortExec
SortPreservingMergeExeccascades, as illustrated below.
A CoalescePartitionsExec + SortExec cascade combines partitions
first, and then sorts:
┌ ─ ─ ─ ─ ─ ┐
┌─┬─┬─┐
││B│A│D│... ├──┐
└─┴─┴─┘ │
└ ─ ─ ─ ─ ─ ┘ │ ┌────────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐
Partition 1 │ │ Coalesce │ ┌─┬─┬─┬─┬─┐ │ │ ┌─┬─┬─┬─┬─┐
├──▶(no ordering guarantees)│──▶││B│E│A│D│C│...───▶ Sort ├───▶││A│B│C│D│E│... │
│ │ │ └─┴─┴─┴─┴─┘ │ │ └─┴─┴─┴─┴─┘
┌ ─ ─ ─ ─ ─ ┐ │ └────────────────────────┘ └ ─ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘
┌─┬─┐ │ Partition Partition
││E│C│ ... ├──┘
└─┴─┘
└ ─ ─ ─ ─ ─ ┘
Partition 2A SortExec + SortPreservingMergeExec cascade sorts each partition
first, then merges partitions while preserving the sort:
┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐
┌─┬─┬─┐ │ │ ┌─┬─┬─┐
││B│A│D│... │──▶│ Sort │──▶││A│B│D│... │──┐
└─┴─┴─┘ │ │ └─┴─┴─┘ │
└ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘ │ ┌─────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐
Partition 1 Partition 1 │ │ │ ┌─┬─┬─┬─┬─┐
├──▶ SortPreservingMerge ├───▶││A│B│C│D│E│... │
│ │ │ └─┴─┴─┴─┴─┘
┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐ │ └─────────────────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘
┌─┬─┐ │ │ ┌─┬─┐ │ Partition
││E│C│ ... │──▶│ Sort ├──▶││C│E│ ... │──┘
└─┴─┘ │ │ └─┴─┘
└ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘
Partition 2 Partition 2The latter SortExec + SortPreservingMergeExec cascade performs
sorting first on a per-partition basis, thereby parallelizing the sort.
The outcome is that plans of the form
"SortExec: expr=\[a@0 ASC\]",
" ...nodes..."
" CoalescePartitionsExec",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",are transformed into
"SortPreservingMergeExec: \[a@0 ASC\]",
" SortExec: expr=\[a@0 ASC\]",
" ...nodes..."
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",by following connections from CoalescePartitionsExecs to SortExecs.
By performing sorting in parallel, we can increase performance in some
scenarios.
This optimization requires that there are no nodes between the SortExec
and the CoalescePartitionsExec, which requires single partitioning. Do
not parallelize when the following scenario occurs:
"SortExec: expr=\[a@0 ASC\]",
" ...nodes requiring single partitioning..."
" CoalescePartitionsExec",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",Steps
- Checks if the plan is either a
SortExec, aSortPreservingMergeExec, or aCoalescePartitionsExec. Otherwise, does nothing. - If the plan is a
SortExecor a finalSortPreservingMergeExec(i.e. output partitioning is 1):- Check for
CoalescePartitionsExecin children. If found, check if it can be removed (with possibleRepartitionExecs). If so, remove (seeremove_bottleneck_in_subplan). - If the plan is satisfying the ordering requirements, add a
SortExec. - Add an SPM above the plan and return.
- Check for
- If the plan is a
CoalescePartitionsExec:- Check if it can be removed (with possible
RepartitionExecs). If so, remove (seeremove_bottleneck_in_subplan).
- Check if it can be removed (with possible