datafusion_physical_optimizer/
utils.rs1use std::sync::Arc;
19
20use datafusion_common::Result;
21use datafusion_physical_expr::{LexOrdering, LexRequirement};
22use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
23use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
24use datafusion_physical_plan::repartition::RepartitionExec;
25use datafusion_physical_plan::sorts::sort::SortExec;
26use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
27use datafusion_physical_plan::tree_node::PlanContext;
28use datafusion_physical_plan::union::UnionExec;
29use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
30use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
31
32pub fn add_sort_above<T: Clone + Default>(
39 node: PlanContext<T>,
40 sort_requirements: LexRequirement,
41 fetch: Option<usize>,
42) -> PlanContext<T> {
43 let mut sort_reqs: Vec<_> = sort_requirements.into();
44 sort_reqs.retain(|sort_expr| {
45 node.plan
46 .equivalence_properties()
47 .is_expr_constant(&sort_expr.expr)
48 .is_none()
49 });
50 let sort_exprs = sort_reqs.into_iter().map(Into::into).collect::<Vec<_>>();
51 let Some(ordering) = LexOrdering::new(sort_exprs) else {
52 return node;
53 };
54 let mut new_sort = SortExec::new(ordering, Arc::clone(&node.plan)).with_fetch(fetch);
55 if node.plan.output_partitioning().partition_count() > 1 {
56 new_sort = new_sort.with_preserve_partitioning(true);
57 }
58 PlanContext::new(Arc::new(new_sort), T::default(), vec![node])
59}
60
61pub fn add_sort_above_with_check<T: Clone + Default>(
65 node: PlanContext<T>,
66 sort_requirements: LexRequirement,
67 fetch: Option<usize>,
68) -> Result<PlanContext<T>> {
69 if !node
70 .plan
71 .equivalence_properties()
72 .ordering_satisfy_requirement(sort_requirements.clone())?
73 {
74 Ok(add_sort_above(node, sort_requirements, fetch))
75 } else {
76 Ok(node)
77 }
78}
79
80pub fn is_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
82 plan.as_any().is::<SortExec>()
83}
84
85pub fn is_window(plan: &Arc<dyn ExecutionPlan>) -> bool {
88 plan.as_any().is::<WindowAggExec>() || plan.as_any().is::<BoundedWindowAggExec>()
89}
90
91pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
93 plan.as_any().is::<UnionExec>()
94}
95
96pub fn is_sort_preserving_merge(plan: &Arc<dyn ExecutionPlan>) -> bool {
98 plan.as_any().is::<SortPreservingMergeExec>()
99}
100
101pub fn is_coalesce_partitions(plan: &Arc<dyn ExecutionPlan>) -> bool {
103 plan.as_any().is::<CoalescePartitionsExec>()
104}
105
106pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
108 plan.as_any().is::<RepartitionExec>()
109}
110
111pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
114 plan.as_any().is::<GlobalLimitExec>() || plan.as_any().is::<LocalLimitExec>()
115}