datafusion_physical_optimizer/ensure_coop.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The [`EnsureCooperative`] optimizer rule inspects the physical plan to find all
19//! portions of the plan that will not yield cooperatively.
20//! It will insert `CooperativeExec` nodes where appropriate to ensure execution plans
21//! always yield cooperatively.
22
23use std::fmt::{Debug, Formatter};
24use std::sync::Arc;
25
26use crate::PhysicalOptimizerRule;
27
28use datafusion_common::config::ConfigOptions;
29use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
30use datafusion_common::Result;
31use datafusion_physical_plan::coop::CooperativeExec;
32use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType};
33use datafusion_physical_plan::ExecutionPlan;
34
35/// `EnsureCooperative` is a [`PhysicalOptimizerRule`] that inspects the physical plan for
36/// sub plans that do not participate in cooperative scheduling. The plan is subdivided into sub
37/// plans on eager evaluation boundaries. Leaf nodes and eager evaluation roots are checked
38/// to see if they participate in cooperative scheduling. Those that do no are wrapped in
39/// a [`CooperativeExec`] parent.
40pub struct EnsureCooperative {}
41
42impl EnsureCooperative {
43 pub fn new() -> Self {
44 Self {}
45 }
46}
47
48impl Default for EnsureCooperative {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl Debug for EnsureCooperative {
55 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct(self.name()).finish()
57 }
58}
59
60impl PhysicalOptimizerRule for EnsureCooperative {
61 fn name(&self) -> &str {
62 "EnsureCooperative"
63 }
64
65 fn optimize(
66 &self,
67 plan: Arc<dyn ExecutionPlan>,
68 _config: &ConfigOptions,
69 ) -> Result<Arc<dyn ExecutionPlan>> {
70 plan.transform_up(|plan| {
71 let is_leaf = plan.children().is_empty();
72 let is_exchange = plan.properties().evaluation_type == EvaluationType::Eager;
73 if (is_leaf || is_exchange)
74 && plan.properties().scheduling_type != SchedulingType::Cooperative
75 {
76 // Wrap non-cooperative leaves or eager evaluation roots in a cooperative exec to
77 // ensure the plans they participate in are properly cooperative.
78 Ok(Transformed::new(
79 Arc::new(CooperativeExec::new(Arc::clone(&plan))),
80 true,
81 TreeNodeRecursion::Continue,
82 ))
83 } else {
84 Ok(Transformed::no(plan))
85 }
86 })
87 .map(|t| t.data)
88 }
89
90 fn schema_check(&self) -> bool {
91 // Wrapping a leaf in YieldStreamExec preserves the schema, so it is safe.
92 true
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99 use datafusion_common::config::ConfigOptions;
100 use datafusion_physical_plan::{displayable, test::scan_partitioned};
101 use insta::assert_snapshot;
102
103 #[tokio::test]
104 async fn test_cooperative_exec_for_custom_exec() {
105 let test_custom_exec = scan_partitioned(1);
106 let config = ConfigOptions::new();
107 let optimized = EnsureCooperative::new()
108 .optimize(test_custom_exec, &config)
109 .unwrap();
110
111 let display = displayable(optimized.as_ref()).indent(true).to_string();
112 // Use insta snapshot to ensure full plan structure
113 assert_snapshot!(display, @r###"
114 CooperativeExec
115 DataSourceExec: partitions=1, partition_sizes=[1]
116 "###);
117 }
118}