datafusion_physical_optimizer/
coalesce_batches.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! CoalesceBatches optimizer that groups batches together rows
19//! in bigger batches to avoid overhead with small batches
20
21use crate::PhysicalOptimizerRule;
22
23use std::sync::Arc;
24
25use datafusion_common::config::ConfigOptions;
26use datafusion_common::error::Result;
27use datafusion_physical_expr::Partitioning;
28use datafusion_physical_plan::{
29    coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
30    repartition::RepartitionExec, ExecutionPlan,
31};
32
33use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
34
35/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
36/// are produced by highly selective filters
37#[derive(Default, Debug)]
38pub struct CoalesceBatches {}
39
40impl CoalesceBatches {
41    #[allow(missing_docs)]
42    pub fn new() -> Self {
43        Self::default()
44    }
45}
46impl PhysicalOptimizerRule for CoalesceBatches {
47    fn optimize(
48        &self,
49        plan: Arc<dyn ExecutionPlan>,
50        config: &ConfigOptions,
51    ) -> Result<Arc<dyn ExecutionPlan>> {
52        if !config.execution.coalesce_batches {
53            return Ok(plan);
54        }
55
56        let target_batch_size = config.execution.batch_size;
57        plan.transform_up(|plan| {
58            let plan_any = plan.as_any();
59            // The goal here is to detect operators that could produce small batches and only
60            // wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
61            // would be to build the coalescing logic directly into the operators
62            // See https://github.com/apache/datafusion/issues/139
63            let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
64                || plan_any.downcast_ref::<HashJoinExec>().is_some()
65                // Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
66                || plan_any
67                    .downcast_ref::<RepartitionExec>()
68                    .map(|repart_exec| {
69                        !matches!(
70                            repart_exec.partitioning().clone(),
71                            Partitioning::RoundRobinBatch(_)
72                        )
73                    })
74                    .unwrap_or(false);
75            if wrap_in_coalesce {
76                Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new(
77                    plan,
78                    target_batch_size,
79                ))))
80            } else {
81                Ok(Transformed::no(plan))
82            }
83        })
84        .data()
85    }
86
87    fn name(&self) -> &str {
88        "coalesce_batches"
89    }
90
91    fn schema_check(&self) -> bool {
92        true
93    }
94}