FilterPushdown

Struct FilterPushdown 

Source
pub struct FilterPushdown {
    phase: FilterPushdownPhase,
    name: String,
}
Expand description

Attempts to recursively push given filters from the top of the tree into leaves.

§Default Implementation

The default implementation in ExecutionPlan::gather_filters_for_pushdown and ExecutionPlan::handle_child_pushdown_result assumes that:

§Example: Push filter into a DataSourceExec

For example, consider the following plan:

┌──────────────────────┐
│ CoalesceBatchesExec  │
└──────────────────────┘
            │
            ▼
┌──────────────────────┐
│      FilterExec      │
│  filters = [ id=1]   │
└──────────────────────┘
            │
            ▼
┌──────────────────────┐
│    DataSourceExec    │
│    projection = *    │
└──────────────────────┘

Our goal is to move the id = 1 filter from the FilterExec node to the DataSourceExec node.

If this filter is selective pushing it into the scan can avoid massive amounts of data being read from the source (the projection is * so all matching columns are read).

The new plan looks like:

┌──────────────────────┐
│ CoalesceBatchesExec  │
└──────────────────────┘
          │
          ▼
┌──────────────────────┐
│    DataSourceExec    │
│    projection = *    │
│   filters = [ id=1]  │
└──────────────────────┘

§Example: Push filters with ProjectionExec

Let’s consider a more complex example involving a ProjectionExec node in between the FilterExec and DataSourceExec nodes that creates a new column that the filter depends on.

┌──────────────────────┐
│ CoalesceBatchesExec  │
└──────────────────────┘
            │
            ▼
┌──────────────────────┐
│      FilterExec      │
│    filters =         │
│     [cost>50,id=1]   │
└──────────────────────┘
            │
            ▼
┌──────────────────────┐
│    ProjectionExec    │
│ cost = price * 1.2   │
└──────────────────────┘
            │
            ▼
┌──────────────────────┐
│    DataSourceExec    │
│    projection = *    │
└──────────────────────┘

We want to push down the filters [id=1] to the DataSourceExec node, but can’t push down cost>50 because it requires the ProjectionExec node to be executed first. A simple thing to do would be to split up the filter into two separate filters and push down the first one:

┌──────────────────────┐
│ CoalesceBatchesExec  │
└──────────────────────┘
            │
            ▼
┌──────────────────────┐
│      FilterExec      │
│    filters =         │
│     [cost>50]        │
└──────────────────────┘
            │
            ▼
┌──────────────────────┐
│    ProjectionExec    │
│ cost = price * 1.2   │
└──────────────────────┘
            │
            ▼
┌──────────────────────┐
│    DataSourceExec    │
│    projection = *    │
│   filters = [ id=1]  │
└──────────────────────┘

We can actually however do better by pushing down price * 1.2 > 50 instead of cost > 50:

┌──────────────────────┐
│ CoalesceBatchesExec  │
└──────────────────────┘
           │
           ▼
┌──────────────────────┐
│    ProjectionExec    │
│ cost = price * 1.2   │
└──────────────────────┘
           │
           ▼
┌──────────────────────┐
│    DataSourceExec    │
│    projection = *    │
│   filters = [id=1,   │
│   price * 1.2 > 50]  │
└──────────────────────┘

§Example: Push filters within a subtree

There are also cases where we may be able to push down filters within a subtree but not the entire tree. A good example of this is aggregation nodes:

┌──────────────────────┐
│ ProjectionExec       │
│ projection = *       │
└──────────────────────┘
          │
          ▼
┌──────────────────────┐
│ FilterExec           │
│ filters = [sum > 10] │
└──────────────────────┘
          │
          ▼
┌───────────────────────┐
│     AggregateExec     │
│    group by = [id]    │
│    aggregate =        │
│      [sum(price)]     │
└───────────────────────┘
          │
          ▼
┌──────────────────────┐
│ FilterExec           │
│ filters = [id=1]     │
└──────────────────────┘
         │
         ▼
┌──────────────────────┐
│ DataSourceExec       │
│ projection = *       │
└──────────────────────┘

The transformation here is to push down the id=1 filter to the DataSourceExec node:

┌──────────────────────┐
│ ProjectionExec       │
│ projection = *       │
└──────────────────────┘
          │
          ▼
┌──────────────────────┐
│ FilterExec           │
│ filters = [sum > 10] │
└──────────────────────┘
          │
          ▼
┌───────────────────────┐
│     AggregateExec     │
│    group by = [id]    │
│    aggregate =        │
│      [sum(price)]     │
└───────────────────────┘
          │
          ▼
┌──────────────────────┐
│ DataSourceExec       │
│ projection = *       │
│ filters = [id=1]     │
└──────────────────────┘

The point here is that:

  1. We cannot push down sum > 10 through the AggregateExec node into the DataSourceExec node. Any filters above the AggregateExec node are not pushed down. This is determined by calling ExecutionPlan::gather_filters_for_pushdown on the AggregateExec node.
  2. We need to keep recursing into the tree so that we can discover the other FilterExec node and push down the id=1 filter.

§Example: Push filters through Joins

It is also possible to push down filters through joins and filters that originate from joins. For example, a hash join where we build a hash table of the left side and probe the right side (ignoring why we would choose this order, typically it depends on the size of each table, etc.).

             ┌─────────────────────┐
             │     FilterExec      │
             │ filters =           │
             │  [d.size > 100]     │
             └─────────────────────┘
                        │
                        │
             ┌──────────▼──────────┐
             │                     │
             │    HashJoinExec     │
             │ [u.dept@hash(d.id)] │
             │                     │
             └─────────────────────┘
                        │
           ┌────────────┴────────────┐
┌──────────▼──────────┐   ┌──────────▼──────────┐
│   DataSourceExec    │   │   DataSourceExec    │
│  alias [users as u] │   │  alias [dept as d]  │
│                     │   │                     │
└─────────────────────┘   └─────────────────────┘

There are two pushdowns we can do here:

  1. Push down the d.size > 100 filter through the HashJoinExec node to the DataSourceExec node for the departments table.
  2. Push down the hash table state from the HashJoinExec node to the DataSourceExec node to avoid reading rows from the users table that will be eliminated by the join. This can be done via a bloom filter or similar and is not (yet) supported in DataFusion. See https://github.com/apache/datafusion/issues/7955.
             ┌─────────────────────┐
             │                     │
             │    HashJoinExec     │
             │ [u.dept@hash(d.id)] │
             │                     │
             └─────────────────────┘
                        │
           ┌────────────┴────────────┐
┌──────────▼──────────┐   ┌──────────▼──────────┐
│   DataSourceExec    │   │   DataSourceExec    │
│  alias [users as u] │   │  alias [dept as d]  │
│ filters =           │   │  filters =          │
│   [depg@hash(d.id)] │   │    [ d.size > 100]  │
└─────────────────────┘   └─────────────────────┘

You may notice in this case that the filter is dynamic: the hash table is built after the departments table is read and at runtime. We don’t have a concrete InList filter or similar to push down at optimization time. These sorts of dynamic filters are handled by building a specialized PhysicalExpr that can be evaluated at runtime and internally maintains a reference to the hash table or other state.

To make working with these sorts of dynamic filters more tractable we have the method PhysicalExpr::snapshot which attempts to simplify a dynamic filter into a “basic” non-dynamic filter. For a join this could mean converting it to an InList filter or a min/max filter for example. See datafusion/physical-plan/src/dynamic_filters.rs for more details.

§Example: Push TopK filters into Scans

Another form of dynamic filter is pushing down the state of a TopK operator for queries like SELECT * FROM t ORDER BY id LIMIT 10:

┌──────────────────────┐
│       TopK           │
│     limit = 10       │
│   order by = [id]    │
└──────────────────────┘
           │
           ▼
┌──────────────────────┐
│    DataSourceExec    │
│    projection = *    │
└──────────────────────┘

We can avoid large amounts of data processing by transforming this into:

┌──────────────────────┐
│       TopK           │
│     limit = 10       │
│   order by = [id]    │
└──────────────────────┘
           │
           ▼
┌──────────────────────┐
│    DataSourceExec    │
│    projection = *    │
│ filters =            │
│    [id < @ TopKHeap] │
└──────────────────────┘

Now as we fill our TopK heap we can push down the state of the heap to the DataSourceExec node to avoid reading files / row groups / pages / rows that could not possibly be in the top 10.

This is not yet implemented in DataFusion. See https://github.com/apache/datafusion/issues/15037

Fields§

§phase: FilterPushdownPhase§name: String

Implementations§

Source§

impl FilterPushdown

Source

fn new_with_phase(phase: FilterPushdownPhase) -> Self

Source

pub fn new() -> Self

Create a new FilterPushdown optimizer rule that runs in the pre-optimization phase. See FilterPushdownPhase for more details.

Source

pub fn new_post_optimization() -> Self

Create a new FilterPushdown optimizer rule that runs in the post-optimization phase. See FilterPushdownPhase for more details.

Trait Implementations§

Source§

impl Debug for FilterPushdown

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for FilterPushdown

Source§

fn default() -> Self

Returns the “default value” for a type. Read more
Source§

impl PhysicalOptimizerRule for FilterPushdown

Source§

fn optimize( &self, plan: Arc<dyn ExecutionPlan>, config: &ConfigOptions, ) -> Result<Arc<dyn ExecutionPlan>>

Rewrite plan to an optimized form
Source§

fn name(&self) -> &str

A human readable name for this optimizer rule
Source§

fn schema_check(&self) -> bool

A flag to indicate whether the physical planner should validate that the rule will not change the schema of the plan after the rewriting. Some of the optimization rules might change the nullable properties of the schema and should disable the schema check.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> Allocation for T
where T: RefUnwindSafe + Send + Sync,

§

impl<T> ErasedDestructor for T
where T: 'static,