datafusion_physical_optimizer/
ensure_coop.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The [`EnsureCooperative`] optimizer rule inspects the physical plan to find all
19//! portions of the plan that will not yield cooperatively.
20//! It will insert `CooperativeExec` nodes where appropriate to ensure execution plans
21//! always yield cooperatively.
22
23use std::fmt::{Debug, Formatter};
24use std::sync::Arc;
25
26use crate::PhysicalOptimizerRule;
27
28use datafusion_common::config::ConfigOptions;
29use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
30use datafusion_common::Result;
31use datafusion_physical_plan::coop::CooperativeExec;
32use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType};
33use datafusion_physical_plan::ExecutionPlan;
34
35/// `EnsureCooperative` is a [`PhysicalOptimizerRule`] that inspects the physical plan for
36/// sub plans that do not participate in cooperative scheduling. The plan is subdivided into sub
37/// plans on eager evaluation boundaries. Leaf nodes and eager evaluation roots are checked
38/// to see if they participate in cooperative scheduling. Those that do no are wrapped in
39/// a [`CooperativeExec`] parent.
40pub struct EnsureCooperative {}
41
42impl EnsureCooperative {
43    pub fn new() -> Self {
44        Self {}
45    }
46}
47
48impl Default for EnsureCooperative {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54impl Debug for EnsureCooperative {
55    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct(self.name()).finish()
57    }
58}
59
60impl PhysicalOptimizerRule for EnsureCooperative {
61    fn name(&self) -> &str {
62        "EnsureCooperative"
63    }
64
65    fn optimize(
66        &self,
67        plan: Arc<dyn ExecutionPlan>,
68        _config: &ConfigOptions,
69    ) -> Result<Arc<dyn ExecutionPlan>> {
70        plan.transform_up(|plan| {
71            let is_leaf = plan.children().is_empty();
72            let is_exchange = plan.properties().evaluation_type == EvaluationType::Eager;
73            if (is_leaf || is_exchange)
74                && plan.properties().scheduling_type != SchedulingType::Cooperative
75            {
76                // Wrap non-cooperative leaves or eager evaluation roots in a cooperative exec to
77                // ensure the plans they participate in are properly cooperative.
78                Ok(Transformed::new(
79                    Arc::new(CooperativeExec::new(Arc::clone(&plan))),
80                    true,
81                    TreeNodeRecursion::Continue,
82                ))
83            } else {
84                Ok(Transformed::no(plan))
85            }
86        })
87        .map(|t| t.data)
88    }
89
90    fn schema_check(&self) -> bool {
91        // Wrapping a leaf in YieldStreamExec preserves the schema, so it is safe.
92        true
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99    use datafusion_common::config::ConfigOptions;
100    use datafusion_physical_plan::{displayable, test::scan_partitioned};
101    use insta::assert_snapshot;
102
103    #[tokio::test]
104    async fn test_cooperative_exec_for_custom_exec() {
105        let test_custom_exec = scan_partitioned(1);
106        let config = ConfigOptions::new();
107        let optimized = EnsureCooperative::new()
108            .optimize(test_custom_exec, &config)
109            .unwrap();
110
111        let display = displayable(optimized.as_ref()).indent(true).to_string();
112        // Use insta snapshot to ensure full plan structure
113        assert_snapshot!(display, @r###"
114            CooperativeExec
115              DataSourceExec: partitions=1, partition_sizes=[1]
116            "###);
117    }
118}