datafusion_physical_optimizer/coalesce_batches.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! CoalesceBatches optimizer that groups batches together rows
19//! in bigger batches to avoid overhead with small batches
20
21use crate::PhysicalOptimizerRule;
22
23use std::sync::Arc;
24
25use datafusion_common::config::ConfigOptions;
26use datafusion_common::error::Result;
27use datafusion_physical_expr::Partitioning;
28use datafusion_physical_plan::{
29 coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
30 repartition::RepartitionExec, ExecutionPlan,
31};
32
33use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
34
35/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
36/// are produced by highly selective filters
37#[derive(Default, Debug)]
38pub struct CoalesceBatches {}
39
40impl CoalesceBatches {
41 #[allow(missing_docs)]
42 pub fn new() -> Self {
43 Self::default()
44 }
45}
46impl PhysicalOptimizerRule for CoalesceBatches {
47 fn optimize(
48 &self,
49 plan: Arc<dyn ExecutionPlan>,
50 config: &ConfigOptions,
51 ) -> Result<Arc<dyn ExecutionPlan>> {
52 if !config.execution.coalesce_batches {
53 return Ok(plan);
54 }
55
56 let target_batch_size = config.execution.batch_size;
57 plan.transform_up(|plan| {
58 let plan_any = plan.as_any();
59 // The goal here is to detect operators that could produce small batches and only
60 // wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
61 // would be to build the coalescing logic directly into the operators
62 // See https://github.com/apache/datafusion/issues/139
63 let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
64 || plan_any.downcast_ref::<HashJoinExec>().is_some()
65 // Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
66 || plan_any
67 .downcast_ref::<RepartitionExec>()
68 .map(|repart_exec| {
69 !matches!(
70 repart_exec.partitioning().clone(),
71 Partitioning::RoundRobinBatch(_)
72 )
73 })
74 .unwrap_or(false);
75 if wrap_in_coalesce {
76 Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new(
77 plan,
78 target_batch_size,
79 ))))
80 } else {
81 Ok(Transformed::no(plan))
82 }
83 })
84 .data()
85 }
86
87 fn name(&self) -> &str {
88 "coalesce_batches"
89 }
90
91 fn schema_check(&self) -> bool {
92 true
93 }
94}