pub struct FilterPushdown {
phase: FilterPushdownPhase,
name: String,
}Expand description
Attempts to recursively push given filters from the top of the tree into leaves.
§Default Implementation
The default implementation in ExecutionPlan::gather_filters_for_pushdown
and ExecutionPlan::handle_child_pushdown_result assumes that:
- Parent filters can’t be passed onto children (determined by
ExecutionPlan::gather_filters_for_pushdown) - This node has no filters to contribute (determined by
ExecutionPlan::gather_filters_for_pushdown). - Any filters that could not be pushed down to the children are marked as unsupported (determined by
ExecutionPlan::handle_child_pushdown_result).
§Example: Push filter into a DataSourceExec
For example, consider the following plan:
┌──────────────────────┐
│ CoalesceBatchesExec │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ FilterExec │
│ filters = [ id=1] │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ DataSourceExec │
│ projection = * │
└──────────────────────┘Our goal is to move the id = 1 filter from the FilterExec node to the DataSourceExec node.
If this filter is selective pushing it into the scan can avoid massive
amounts of data being read from the source (the projection is * so all
matching columns are read).
The new plan looks like:
┌──────────────────────┐
│ CoalesceBatchesExec │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ DataSourceExec │
│ projection = * │
│ filters = [ id=1] │
└──────────────────────┘§Example: Push filters with ProjectionExec
Let’s consider a more complex example involving a ProjectionExec
node in between the FilterExec and DataSourceExec nodes that
creates a new column that the filter depends on.
┌──────────────────────┐
│ CoalesceBatchesExec │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ FilterExec │
│ filters = │
│ [cost>50,id=1] │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ ProjectionExec │
│ cost = price * 1.2 │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ DataSourceExec │
│ projection = * │
└──────────────────────┘We want to push down the filters [id=1] to the DataSourceExec node,
but can’t push down cost>50 because it requires the ProjectionExec
node to be executed first. A simple thing to do would be to split up the
filter into two separate filters and push down the first one:
┌──────────────────────┐
│ CoalesceBatchesExec │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ FilterExec │
│ filters = │
│ [cost>50] │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ ProjectionExec │
│ cost = price * 1.2 │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ DataSourceExec │
│ projection = * │
│ filters = [ id=1] │
└──────────────────────┘We can actually however do better by pushing down price * 1.2 > 50
instead of cost > 50:
┌──────────────────────┐
│ CoalesceBatchesExec │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ ProjectionExec │
│ cost = price * 1.2 │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ DataSourceExec │
│ projection = * │
│ filters = [id=1, │
│ price * 1.2 > 50] │
└──────────────────────┘§Example: Push filters within a subtree
There are also cases where we may be able to push down filters within a subtree but not the entire tree. A good example of this is aggregation nodes:
┌──────────────────────┐
│ ProjectionExec │
│ projection = * │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ FilterExec │
│ filters = [sum > 10] │
└──────────────────────┘
│
▼
┌───────────────────────┐
│ AggregateExec │
│ group by = [id] │
│ aggregate = │
│ [sum(price)] │
└───────────────────────┘
│
▼
┌──────────────────────┐
│ FilterExec │
│ filters = [id=1] │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ DataSourceExec │
│ projection = * │
└──────────────────────┘The transformation here is to push down the id=1 filter to the
DataSourceExec node:
┌──────────────────────┐
│ ProjectionExec │
│ projection = * │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ FilterExec │
│ filters = [sum > 10] │
└──────────────────────┘
│
▼
┌───────────────────────┐
│ AggregateExec │
│ group by = [id] │
│ aggregate = │
│ [sum(price)] │
└───────────────────────┘
│
▼
┌──────────────────────┐
│ DataSourceExec │
│ projection = * │
│ filters = [id=1] │
└──────────────────────┘The point here is that:
- We cannot push down
sum > 10through theAggregateExecnode into theDataSourceExecnode. Any filters above theAggregateExecnode are not pushed down. This is determined by callingExecutionPlan::gather_filters_for_pushdownon theAggregateExecnode. - We need to keep recursing into the tree so that we can discover the other
FilterExecnode and push down theid=1filter.
§Example: Push filters through Joins
It is also possible to push down filters through joins and filters that originate from joins. For example, a hash join where we build a hash table of the left side and probe the right side (ignoring why we would choose this order, typically it depends on the size of each table, etc.).
┌─────────────────────┐
│ FilterExec │
│ filters = │
│ [d.size > 100] │
└─────────────────────┘
│
│
┌──────────▼──────────┐
│ │
│ HashJoinExec │
│ [u.dept@hash(d.id)] │
│ │
└─────────────────────┘
│
┌────────────┴────────────┐
┌──────────▼──────────┐ ┌──────────▼──────────┐
│ DataSourceExec │ │ DataSourceExec │
│ alias [users as u] │ │ alias [dept as d] │
│ │ │ │
└─────────────────────┘ └─────────────────────┘There are two pushdowns we can do here:
- Push down the
d.size > 100filter through theHashJoinExecnode to theDataSourceExecnode for thedepartmentstable. - Push down the hash table state from the
HashJoinExecnode to theDataSourceExecnode to avoid reading rows from theuserstable that will be eliminated by the join. This can be done via a bloom filter or similar and is not (yet) supported in DataFusion. See https://github.com/apache/datafusion/issues/7955.
┌─────────────────────┐
│ │
│ HashJoinExec │
│ [u.dept@hash(d.id)] │
│ │
└─────────────────────┘
│
┌────────────┴────────────┐
┌──────────▼──────────┐ ┌──────────▼──────────┐
│ DataSourceExec │ │ DataSourceExec │
│ alias [users as u] │ │ alias [dept as d] │
│ filters = │ │ filters = │
│ [depg@hash(d.id)] │ │ [ d.size > 100] │
└─────────────────────┘ └─────────────────────┘You may notice in this case that the filter is dynamic: the hash table
is built after the departments table is read and at runtime. We
don’t have a concrete InList filter or similar to push down at
optimization time. These sorts of dynamic filters are handled by
building a specialized PhysicalExpr that can be evaluated at runtime
and internally maintains a reference to the hash table or other state.
To make working with these sorts of dynamic filters more tractable we have the method PhysicalExpr::snapshot
which attempts to simplify a dynamic filter into a “basic” non-dynamic filter.
For a join this could mean converting it to an InList filter or a min/max filter for example.
See datafusion/physical-plan/src/dynamic_filters.rs for more details.
§Example: Push TopK filters into Scans
Another form of dynamic filter is pushing down the state of a TopK
operator for queries like SELECT * FROM t ORDER BY id LIMIT 10:
┌──────────────────────┐
│ TopK │
│ limit = 10 │
│ order by = [id] │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ DataSourceExec │
│ projection = * │
└──────────────────────┘We can avoid large amounts of data processing by transforming this into:
┌──────────────────────┐
│ TopK │
│ limit = 10 │
│ order by = [id] │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ DataSourceExec │
│ projection = * │
│ filters = │
│ [id < @ TopKHeap] │
└──────────────────────┘Now as we fill our TopK heap we can push down the state of the heap to
the DataSourceExec node to avoid reading files / row groups / pages /
rows that could not possibly be in the top 10.
This is not yet implemented in DataFusion. See https://github.com/apache/datafusion/issues/15037
Fields§
§phase: FilterPushdownPhase§name: StringImplementations§
Source§impl FilterPushdown
impl FilterPushdown
Sourcepub fn new() -> FilterPushdown
pub fn new() -> FilterPushdown
Create a new FilterPushdown optimizer rule that runs in the pre-optimization phase.
See FilterPushdownPhase for more details.
Sourcepub fn new_post_optimization() -> FilterPushdown
pub fn new_post_optimization() -> FilterPushdown
Create a new FilterPushdown optimizer rule that runs in the post-optimization phase.
See FilterPushdownPhase for more details.
Trait Implementations§
Source§impl Debug for FilterPushdown
impl Debug for FilterPushdown
Source§impl Default for FilterPushdown
impl Default for FilterPushdown
Source§fn default() -> FilterPushdown
fn default() -> FilterPushdown
Source§impl PhysicalOptimizerRule for FilterPushdown
impl PhysicalOptimizerRule for FilterPushdown
Source§fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>
fn optimize( &self, plan: Arc<dyn ExecutionPlan>, config: &ConfigOptions, ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>
plan to an optimized formSource§fn schema_check(&self) -> bool
fn schema_check(&self) -> bool
Auto Trait Implementations§
impl Freeze for FilterPushdown
impl RefUnwindSafe for FilterPushdown
impl Send for FilterPushdown
impl Sync for FilterPushdown
impl Unpin for FilterPushdown
impl UnwindSafe for FilterPushdown
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more