parallelize_sorts

Function parallelize_sorts 

Source
pub fn parallelize_sorts(
    requirements: PlanContext<bool>,
) -> Result<Transformed<PlanContext<bool>>, DataFusionError>
Expand description

Transform CoalescePartitionsExec + SortExec cascades into SortExec

A CoalescePartitionsExec + SortExec cascade combines partitions first, and then sorts:

  ┌ ─ ─ ─ ─ ─ ┐
   ┌─┬─┬─┐
  ││B│A│D│... ├──┐
   └─┴─┴─┘       │
  └ ─ ─ ─ ─ ─ ┘  │  ┌────────────────────────┐   ┌ ─ ─ ─ ─ ─ ─ ┐   ┌────────┐    ┌ ─ ─ ─ ─ ─ ─ ─ ┐
   Partition 1   │  │        Coalesce        │    ┌─┬─┬─┬─┬─┐      │        │     ┌─┬─┬─┬─┬─┐
                 ├──▶(no ordering guarantees)│──▶││B│E│A│D│C│...───▶  Sort  ├───▶││A│B│C│D│E│... │
                 │  │                        │    └─┴─┴─┴─┴─┘      │        │     └─┴─┴─┴─┴─┘
  ┌ ─ ─ ─ ─ ─ ┐  │  └────────────────────────┘   └ ─ ─ ─ ─ ─ ─ ┘   └────────┘    └ ─ ─ ─ ─ ─ ─ ─ ┘
   ┌─┬─┐         │                                 Partition                       Partition
  ││E│C│ ...  ├──┘
   └─┴─┘
  └ ─ ─ ─ ─ ─ ┘
   Partition 2

A SortExec + SortPreservingMergeExec cascade sorts each partition first, then merges partitions while preserving the sort:

  ┌ ─ ─ ─ ─ ─ ┐   ┌────────┐   ┌ ─ ─ ─ ─ ─ ┐
   ┌─┬─┬─┐        │        │    ┌─┬─┬─┐
  ││B│A│D│... │──▶│  Sort  │──▶││A│B│D│... │──┐
   └─┴─┴─┘        │        │    └─┴─┴─┘       │
  └ ─ ─ ─ ─ ─ ┘   └────────┘   └ ─ ─ ─ ─ ─ ┘  │  ┌─────────────────────┐    ┌ ─ ─ ─ ─ ─ ─ ─ ┐
   Partition 1                  Partition 1   │  │                     │     ┌─┬─┬─┬─┬─┐
                                              ├──▶ SortPreservingMerge ├───▶││A│B│C│D│E│... │
                                              │  │                     │     └─┴─┴─┴─┴─┘
  ┌ ─ ─ ─ ─ ─ ┐   ┌────────┐   ┌ ─ ─ ─ ─ ─ ┐  │  └─────────────────────┘    └ ─ ─ ─ ─ ─ ─ ─ ┘
   ┌─┬─┐          │        │    ┌─┬─┐         │                               Partition
  ││E│C│ ...  │──▶│  Sort  ├──▶││C│E│ ...  │──┘
   └─┴─┘          │        │    └─┴─┘
  └ ─ ─ ─ ─ ─ ┘   └────────┘   └ ─ ─ ─ ─ ─ ┘
   Partition 2                  Partition 2

The latter SortExec + SortPreservingMergeExec cascade performs sorting first on a per-partition basis, thereby parallelizing the sort.

The outcome is that plans of the form

     "SortExec: expr=\[a@0 ASC\]",
     "  ...nodes..."
     "    CoalescePartitionsExec",
     "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",

are transformed into

     "SortPreservingMergeExec: \[a@0 ASC\]",
     "  SortExec: expr=\[a@0 ASC\]",
     "    ...nodes..."
     "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",

by following connections from CoalescePartitionsExecs to SortExecs. By performing sorting in parallel, we can increase performance in some scenarios.

This optimization requires that there are no nodes between the SortExec and the CoalescePartitionsExec, which requires single partitioning. Do not parallelize when the following scenario occurs:

     "SortExec: expr=\[a@0 ASC\]",
     "  ...nodes requiring single partitioning..."
     "    CoalescePartitionsExec",
     "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",

Steps

  1. Checks if the plan is either a SortExec, a SortPreservingMergeExec, or a CoalescePartitionsExec. Otherwise, does nothing.
  2. If the plan is a SortExec or a final SortPreservingMergeExec (i.e. output partitioning is 1):
    • Check for CoalescePartitionsExec in children. If found, check if it can be removed (with possible RepartitionExecs). If so, remove (see remove_bottleneck_in_subplan).
    • If the plan is satisfying the ordering requirements, add a SortExec.
    • Add an SPM above the plan and return.
  3. If the plan is a CoalescePartitionsExec:
    • Check if it can be removed (with possible RepartitionExecs). If so, remove (see remove_bottleneck_in_subplan).