datafusion_physical_plan/windows/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Physical expressions for window functions
19
20mod bounded_window_agg_exec;
21mod utils;
22mod window_agg_exec;
23
24use std::borrow::Borrow;
25use std::sync::Arc;
26
27use crate::{
28    expressions::PhysicalSortExpr, ExecutionPlan, ExecutionPlanProperties,
29    InputOrderMode, PhysicalExpr,
30};
31
32use arrow::datatypes::{Schema, SchemaRef};
33use arrow_schema::{FieldRef, SortOptions};
34use datafusion_common::{exec_err, Result};
35use datafusion_expr::{
36    LimitEffect, PartitionEvaluator, ReversedUDWF, SetMonotonicity, WindowFrame,
37    WindowFunctionDefinition, WindowUDF,
38};
39use datafusion_functions_window_common::expr::ExpressionArgs;
40use datafusion_functions_window_common::field::WindowUDFFieldArgs;
41use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
42use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
43use datafusion_physical_expr::expressions::Column;
44use datafusion_physical_expr::window::{
45    SlidingAggregateWindowExpr, StandardWindowFunctionExpr,
46};
47use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
48use datafusion_physical_expr_common::sort_expr::{
49    LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortRequirement,
50};
51
52use itertools::Itertools;
53
54// Public interface:
55pub use bounded_window_agg_exec::BoundedWindowAggExec;
56pub use datafusion_physical_expr::window::{
57    PlainAggregateWindowExpr, StandardWindowExpr, WindowExpr,
58};
59pub use window_agg_exec::WindowAggExec;
60
61/// Build field from window function and add it into schema
62pub fn schema_add_window_field(
63    args: &[Arc<dyn PhysicalExpr>],
64    schema: &Schema,
65    window_fn: &WindowFunctionDefinition,
66    fn_name: &str,
67) -> Result<Arc<Schema>> {
68    let fields = args
69        .iter()
70        .map(|e| Arc::clone(e).as_ref().return_field(schema))
71        .collect::<Result<Vec<_>>>()?;
72    let window_expr_return_field = window_fn.return_field(&fields, fn_name)?;
73    let mut window_fields = schema
74        .fields()
75        .iter()
76        .map(|f| f.as_ref().clone())
77        .collect_vec();
78    // Skip extending schema for UDAF
79    if let WindowFunctionDefinition::AggregateUDF(_) = window_fn {
80        Ok(Arc::new(Schema::new(window_fields)))
81    } else {
82        window_fields.extend_from_slice(&[window_expr_return_field
83            .as_ref()
84            .clone()
85            .with_name(fn_name)]);
86        Ok(Arc::new(Schema::new(window_fields)))
87    }
88}
89
90/// Create a physical expression for window function
91#[allow(clippy::too_many_arguments)]
92pub fn create_window_expr(
93    fun: &WindowFunctionDefinition,
94    name: String,
95    args: &[Arc<dyn PhysicalExpr>],
96    partition_by: &[Arc<dyn PhysicalExpr>],
97    order_by: &[PhysicalSortExpr],
98    window_frame: Arc<WindowFrame>,
99    input_schema: SchemaRef,
100    ignore_nulls: bool,
101    distinct: bool,
102    filter: Option<Arc<dyn PhysicalExpr>>,
103) -> Result<Arc<dyn WindowExpr>> {
104    Ok(match fun {
105        WindowFunctionDefinition::AggregateUDF(fun) => {
106            let aggregate = if distinct {
107                AggregateExprBuilder::new(Arc::clone(fun), args.to_vec())
108                    .schema(input_schema)
109                    .alias(name)
110                    .with_ignore_nulls(ignore_nulls)
111                    .distinct()
112                    .build()
113                    .map(Arc::new)?
114            } else {
115                AggregateExprBuilder::new(Arc::clone(fun), args.to_vec())
116                    .schema(input_schema)
117                    .alias(name)
118                    .with_ignore_nulls(ignore_nulls)
119                    .build()
120                    .map(Arc::new)?
121            };
122            window_expr_from_aggregate_expr(
123                partition_by,
124                order_by,
125                window_frame,
126                aggregate,
127                filter,
128            )
129        }
130        WindowFunctionDefinition::WindowUDF(fun) => Arc::new(StandardWindowExpr::new(
131            create_udwf_window_expr(fun, args, &input_schema, name, ignore_nulls)?,
132            partition_by,
133            order_by,
134            window_frame,
135        )),
136    })
137}
138
139/// Creates an appropriate [`WindowExpr`] based on the window frame and
140fn window_expr_from_aggregate_expr(
141    partition_by: &[Arc<dyn PhysicalExpr>],
142    order_by: &[PhysicalSortExpr],
143    window_frame: Arc<WindowFrame>,
144    aggregate: Arc<AggregateFunctionExpr>,
145    filter: Option<Arc<dyn PhysicalExpr>>,
146) -> Arc<dyn WindowExpr> {
147    // Is there a potentially unlimited sized window frame?
148    let unbounded_window = window_frame.is_ever_expanding();
149
150    if !unbounded_window {
151        Arc::new(SlidingAggregateWindowExpr::new(
152            aggregate,
153            partition_by,
154            order_by,
155            window_frame,
156            filter,
157        ))
158    } else {
159        Arc::new(PlainAggregateWindowExpr::new(
160            aggregate,
161            partition_by,
162            order_by,
163            window_frame,
164            filter,
165        ))
166    }
167}
168
169/// Creates a `StandardWindowFunctionExpr` suitable for a user defined window function
170pub fn create_udwf_window_expr(
171    fun: &Arc<WindowUDF>,
172    args: &[Arc<dyn PhysicalExpr>],
173    input_schema: &Schema,
174    name: String,
175    ignore_nulls: bool,
176) -> Result<Arc<dyn StandardWindowFunctionExpr>> {
177    // need to get the types into an owned vec for some reason
178    let input_fields: Vec<_> = args
179        .iter()
180        .map(|arg| arg.return_field(input_schema))
181        .collect::<Result<_>>()?;
182
183    let udwf_expr = Arc::new(WindowUDFExpr {
184        fun: Arc::clone(fun),
185        args: args.to_vec(),
186        input_fields,
187        name,
188        is_reversed: false,
189        ignore_nulls,
190    });
191
192    // Early validation of input expressions
193    // We create a partition evaluator because in the user-defined window
194    // implementation this is where code for parsing input expressions
195    // exist. The benefits are:
196    // - If any of the input expressions are invalid we catch them early
197    // in the planning phase, rather than during execution.
198    // - Maintains compatibility with built-in (now removed) window
199    // functions validation behavior.
200    // - Predictable and reliable error handling.
201    // See discussion here:
202    // https://github.com/apache/datafusion/pull/13201#issuecomment-2454209975
203    let _ = udwf_expr.create_evaluator()?;
204
205    Ok(udwf_expr)
206}
207
208/// Implements [`StandardWindowFunctionExpr`] for [`WindowUDF`]
209#[derive(Clone, Debug)]
210pub struct WindowUDFExpr {
211    fun: Arc<WindowUDF>,
212    args: Vec<Arc<dyn PhysicalExpr>>,
213    /// Display name
214    name: String,
215    /// Fields of input expressions
216    input_fields: Vec<FieldRef>,
217    /// This is set to `true` only if the user-defined window function
218    /// expression supports evaluation in reverse order, and the
219    /// evaluation order is reversed.
220    is_reversed: bool,
221    /// Set to `true` if `IGNORE NULLS` is defined, `false` otherwise.
222    ignore_nulls: bool,
223}
224
225impl WindowUDFExpr {
226    pub fn fun(&self) -> &Arc<WindowUDF> {
227        &self.fun
228    }
229}
230
231impl StandardWindowFunctionExpr for WindowUDFExpr {
232    fn as_any(&self) -> &dyn std::any::Any {
233        self
234    }
235
236    fn field(&self) -> Result<FieldRef> {
237        self.fun
238            .field(WindowUDFFieldArgs::new(&self.input_fields, &self.name))
239    }
240
241    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
242        self.fun
243            .expressions(ExpressionArgs::new(&self.args, &self.input_fields))
244    }
245
246    fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
247        self.fun
248            .partition_evaluator_factory(PartitionEvaluatorArgs::new(
249                &self.args,
250                &self.input_fields,
251                self.is_reversed,
252                self.ignore_nulls,
253            ))
254    }
255
256    fn name(&self) -> &str {
257        &self.name
258    }
259
260    fn reverse_expr(&self) -> Option<Arc<dyn StandardWindowFunctionExpr>> {
261        match self.fun.reverse_expr() {
262            ReversedUDWF::Identical => Some(Arc::new(self.clone())),
263            ReversedUDWF::NotSupported => None,
264            ReversedUDWF::Reversed(fun) => Some(Arc::new(WindowUDFExpr {
265                fun,
266                args: self.args.clone(),
267                name: self.name.clone(),
268                input_fields: self.input_fields.clone(),
269                is_reversed: !self.is_reversed,
270                ignore_nulls: self.ignore_nulls,
271            })),
272        }
273    }
274
275    fn get_result_ordering(&self, schema: &SchemaRef) -> Option<PhysicalSortExpr> {
276        self.fun
277            .sort_options()
278            .zip(schema.column_with_name(self.name()))
279            .map(|(options, (idx, field))| {
280                let expr = Arc::new(Column::new(field.name(), idx));
281                PhysicalSortExpr { expr, options }
282            })
283    }
284
285    fn limit_effect(&self) -> LimitEffect {
286        self.fun.inner().limit_effect(self.args.as_slice())
287    }
288}
289
290pub(crate) fn calc_requirements<
291    T: Borrow<Arc<dyn PhysicalExpr>>,
292    S: Borrow<PhysicalSortExpr>,
293>(
294    partition_by_exprs: impl IntoIterator<Item = T>,
295    orderby_sort_exprs: impl IntoIterator<Item = S>,
296) -> Option<OrderingRequirements> {
297    let mut sort_reqs_with_partition = partition_by_exprs
298        .into_iter()
299        .map(|partition_by| {
300            PhysicalSortRequirement::new(Arc::clone(partition_by.borrow()), None)
301        })
302        .collect::<Vec<_>>();
303    let mut sort_reqs = vec![];
304    for element in orderby_sort_exprs.into_iter() {
305        let PhysicalSortExpr { expr, options } = element.borrow();
306        let sort_req = PhysicalSortRequirement::new(Arc::clone(expr), Some(*options));
307        if !sort_reqs_with_partition.iter().any(|e| e.expr.eq(expr)) {
308            sort_reqs_with_partition.push(sort_req.clone());
309        }
310        if !sort_reqs
311            .iter()
312            .any(|e: &PhysicalSortRequirement| e.expr.eq(expr))
313        {
314            sort_reqs.push(sort_req);
315        }
316    }
317
318    let mut alternatives = vec![];
319    alternatives.extend(LexRequirement::new(sort_reqs_with_partition));
320    alternatives.extend(LexRequirement::new(sort_reqs));
321
322    OrderingRequirements::new_alternatives(alternatives, false)
323}
324
325/// This function calculates the indices such that when partition by expressions reordered with the indices
326/// resulting expressions define a preset for existing ordering.
327/// For instance, if input is ordered by a, b, c and PARTITION BY b, a is used,
328/// this vector will be [1, 0]. It means that when we iterate b, a columns with the order [1, 0]
329/// resulting vector (a, b) is a preset of the existing ordering (a, b, c).
330pub fn get_ordered_partition_by_indices(
331    partition_by_exprs: &[Arc<dyn PhysicalExpr>],
332    input: &Arc<dyn ExecutionPlan>,
333) -> Result<Vec<usize>> {
334    let (_, indices) = input
335        .equivalence_properties()
336        .find_longest_permutation(partition_by_exprs)?;
337    Ok(indices)
338}
339
340pub(crate) fn get_partition_by_sort_exprs(
341    input: &Arc<dyn ExecutionPlan>,
342    partition_by_exprs: &[Arc<dyn PhysicalExpr>],
343    ordered_partition_by_indices: &[usize],
344) -> Result<Vec<PhysicalSortExpr>> {
345    let ordered_partition_exprs = ordered_partition_by_indices
346        .iter()
347        .map(|idx| Arc::clone(&partition_by_exprs[*idx]))
348        .collect::<Vec<_>>();
349    // Make sure ordered section doesn't move over the partition by expression
350    assert!(ordered_partition_by_indices.len() <= partition_by_exprs.len());
351    let (ordering, _) = input
352        .equivalence_properties()
353        .find_longest_permutation(&ordered_partition_exprs)?;
354    if ordering.len() == ordered_partition_exprs.len() {
355        Ok(ordering)
356    } else {
357        exec_err!("Expects PARTITION BY expression to be ordered")
358    }
359}
360
361pub(crate) fn window_equivalence_properties(
362    schema: &SchemaRef,
363    input: &Arc<dyn ExecutionPlan>,
364    window_exprs: &[Arc<dyn WindowExpr>],
365) -> Result<EquivalenceProperties> {
366    // We need to update the schema, so we can't directly use input's equivalence
367    // properties.
368    let mut window_eq_properties = EquivalenceProperties::new(Arc::clone(schema))
369        .extend(input.equivalence_properties().clone())?;
370
371    let window_schema_len = schema.fields.len();
372    let input_schema_len = window_schema_len - window_exprs.len();
373    let window_expr_indices = (input_schema_len..window_schema_len).collect::<Vec<_>>();
374
375    for (i, expr) in window_exprs.iter().enumerate() {
376        let partitioning_exprs = expr.partition_by();
377        let no_partitioning = partitioning_exprs.is_empty();
378
379        // Find "one" valid ordering for partition columns to avoid exponential complexity.
380        // see https://github.com/apache/datafusion/issues/17401
381        let mut all_satisfied_lexs = vec![];
382        let mut candidate_ordering = vec![];
383
384        for partition_expr in partitioning_exprs.iter() {
385            let sort_options =
386                sort_options_resolving_constant(Arc::clone(partition_expr), true);
387
388            // Try each sort option and pick the first one that works
389            let mut found = false;
390            for sort_expr in sort_options.into_iter() {
391                candidate_ordering.push(sort_expr);
392                if let Some(lex) = LexOrdering::new(candidate_ordering.clone()) {
393                    if window_eq_properties.ordering_satisfy(lex)? {
394                        found = true;
395                        break;
396                    }
397                }
398                // This option didn't work, remove it and try the next one
399                candidate_ordering.pop();
400            }
401            // If no sort option works for this column, we can't build a valid ordering
402            if !found {
403                candidate_ordering.clear();
404                break;
405            }
406        }
407
408        // If we successfully built an ordering for all columns, use it
409        // When there are no partition expressions, candidate_ordering will be empty and won't be added
410        if candidate_ordering.len() == partitioning_exprs.len() {
411            if let Some(lex) = LexOrdering::new(candidate_ordering) {
412                all_satisfied_lexs.push(lex);
413            }
414        }
415        // If there is a partitioning, and no possible ordering cannot satisfy
416        // the input plan's orderings, then we cannot further introduce any
417        // new orderings for the window plan.
418        if !no_partitioning && all_satisfied_lexs.is_empty() {
419            return Ok(window_eq_properties);
420        } else if let Some(std_expr) = expr.as_any().downcast_ref::<StandardWindowExpr>()
421        {
422            std_expr.add_equal_orderings(&mut window_eq_properties)?;
423        } else if let Some(plain_expr) =
424            expr.as_any().downcast_ref::<PlainAggregateWindowExpr>()
425        {
426            // We are dealing with plain window frames; i.e. frames having an
427            // unbounded starting point.
428            // First, check if the frame covers the whole table:
429            if plain_expr.get_window_frame().end_bound.is_unbounded() {
430                let window_col =
431                    Arc::new(Column::new(expr.name(), i + input_schema_len)) as _;
432                if no_partitioning {
433                    // Window function has a constant result across the table:
434                    window_eq_properties
435                        .add_constants(std::iter::once(ConstExpr::from(window_col)))?
436                } else {
437                    // Window function results in a partial constant value in
438                    // some ordering. Adjust the ordering equivalences accordingly:
439                    let new_lexs = all_satisfied_lexs.into_iter().flat_map(|lex| {
440                        let new_partial_consts = sort_options_resolving_constant(
441                            Arc::clone(&window_col),
442                            false,
443                        );
444
445                        new_partial_consts.into_iter().map(move |partial| {
446                            let mut existing = lex.clone();
447                            existing.push(partial);
448                            existing
449                        })
450                    });
451                    window_eq_properties.add_orderings(new_lexs);
452                }
453            } else {
454                // The window frame is ever expanding, so set monotonicity comes
455                // into play.
456                plain_expr.add_equal_orderings(
457                    &mut window_eq_properties,
458                    window_expr_indices[i],
459                )?;
460            }
461        } else if let Some(sliding_expr) =
462            expr.as_any().downcast_ref::<SlidingAggregateWindowExpr>()
463        {
464            // We are dealing with sliding window frames; i.e. frames having an
465            // advancing starting point. If we have a set-monotonic expression,
466            // we might be able to leverage this property.
467            let set_monotonicity = sliding_expr.get_aggregate_expr().set_monotonicity();
468            if set_monotonicity.ne(&SetMonotonicity::NotMonotonic) {
469                // If the window frame is ever-receding, and we have set
470                // monotonicity, we can utilize it to introduce new orderings.
471                let frame = sliding_expr.get_window_frame();
472                if frame.end_bound.is_unbounded() {
473                    let increasing = set_monotonicity.eq(&SetMonotonicity::Increasing);
474                    let window_col = Column::new(expr.name(), i + input_schema_len);
475                    if no_partitioning {
476                        // Reverse set-monotonic cases with no partitioning:
477                        window_eq_properties.add_ordering([PhysicalSortExpr::new(
478                            Arc::new(window_col),
479                            SortOptions::new(increasing, true),
480                        )]);
481                    } else {
482                        // Reverse set-monotonic cases for all orderings:
483                        for mut lex in all_satisfied_lexs.into_iter() {
484                            lex.push(PhysicalSortExpr::new(
485                                Arc::new(window_col.clone()),
486                                SortOptions::new(increasing, true),
487                            ));
488                            window_eq_properties.add_ordering(lex);
489                        }
490                    }
491                }
492                // If we ensure that the elements entering the frame is greater
493                // than the ones leaving, and we have increasing set-monotonicity,
494                // then the window function result will be increasing. However,
495                // we also need to check if the frame is causal. If not, we cannot
496                // utilize set-monotonicity since the set shrinks as the frame
497                // boundary starts "touching" the end of the table.
498                else if frame.is_causal() {
499                    // Find one valid ordering for aggregate arguments instead of
500                    // checking all combinations
501                    let aggregate_exprs = sliding_expr.get_aggregate_expr().expressions();
502                    let mut candidate_order = vec![];
503                    let mut asc = false;
504
505                    for (idx, expr) in aggregate_exprs.iter().enumerate() {
506                        let mut found = false;
507                        let sort_options =
508                            sort_options_resolving_constant(Arc::clone(expr), false);
509
510                        // Try each option and pick the first that works
511                        for sort_expr in sort_options.into_iter() {
512                            let is_asc = !sort_expr.options.descending;
513                            candidate_order.push(sort_expr);
514
515                            if let Some(lex) = LexOrdering::new(candidate_order.clone()) {
516                                if window_eq_properties.ordering_satisfy(lex)? {
517                                    if idx == 0 {
518                                        // The first column's ordering direction determines the overall
519                                        // monotonicity behavior of the window result.
520                                        // - If the aggregate has increasing set monotonicity (e.g., MAX, COUNT)
521                                        //   and the first arg is ascending, the window result is increasing
522                                        // - If the aggregate has decreasing set monotonicity (e.g., MIN)
523                                        //   and the first arg is ascending, the window result is also increasing
524                                        // This flag is used to determine the final window column ordering.
525                                        asc = is_asc;
526                                    }
527                                    found = true;
528                                    break;
529                                }
530                            }
531                            // This option didn't work, remove it and try the next one
532                            candidate_order.pop();
533                        }
534
535                        // If we couldn't extend the ordering, stop trying
536                        if !found {
537                            break;
538                        }
539                    }
540
541                    // Check if we successfully built a complete ordering
542                    let satisfied = candidate_order.len() == aggregate_exprs.len()
543                        && !aggregate_exprs.is_empty();
544
545                    if satisfied {
546                        let increasing =
547                            set_monotonicity.eq(&SetMonotonicity::Increasing);
548                        let window_col = Column::new(expr.name(), i + input_schema_len);
549                        if increasing && (asc || no_partitioning) {
550                            window_eq_properties.add_ordering([PhysicalSortExpr::new(
551                                Arc::new(window_col),
552                                SortOptions::new(false, false),
553                            )]);
554                        } else if !increasing && (!asc || no_partitioning) {
555                            window_eq_properties.add_ordering([PhysicalSortExpr::new(
556                                Arc::new(window_col),
557                                SortOptions::new(true, false),
558                            )]);
559                        };
560                    }
561                }
562            }
563        }
564    }
565    Ok(window_eq_properties)
566}
567
568/// Constructs the best-fitting windowing operator (a `WindowAggExec` or a
569/// `BoundedWindowExec`) for the given `input` according to the specifications
570/// of `window_exprs` and `physical_partition_keys`. Here, best-fitting means
571/// not requiring additional sorting and/or partitioning for the given input.
572/// - A return value of `None` represents that there is no way to construct a
573///   windowing operator that doesn't need additional sorting/partitioning for
574///   the given input. Existing ordering should be changed to run the given
575///   windowing operation.
576/// - A `Some(window exec)` value contains the optimal windowing operator (a
577///   `WindowAggExec` or a `BoundedWindowExec`) for the given input.
578pub fn get_best_fitting_window(
579    window_exprs: &[Arc<dyn WindowExpr>],
580    input: &Arc<dyn ExecutionPlan>,
581    // These are the partition keys used during repartitioning.
582    // They are either the same with `window_expr`'s PARTITION BY columns,
583    // or it is empty if partitioning is not desirable for this windowing operator.
584    physical_partition_keys: &[Arc<dyn PhysicalExpr>],
585) -> Result<Option<Arc<dyn ExecutionPlan>>> {
586    // Contains at least one window expr and all of the partition by and order by sections
587    // of the window_exprs are same.
588    let partitionby_exprs = window_exprs[0].partition_by();
589    let orderby_keys = window_exprs[0].order_by();
590    let (should_reverse, input_order_mode) =
591        if let Some((should_reverse, input_order_mode)) =
592            get_window_mode(partitionby_exprs, orderby_keys, input)?
593        {
594            (should_reverse, input_order_mode)
595        } else {
596            return Ok(None);
597        };
598    let is_unbounded = input.boundedness().is_unbounded();
599    if !is_unbounded && input_order_mode != InputOrderMode::Sorted {
600        // Executor has bounded input and `input_order_mode` is not `InputOrderMode::Sorted`
601        // in this case removing the sort is not helpful, return:
602        return Ok(None);
603    };
604
605    let window_expr = if should_reverse {
606        if let Some(reversed_window_expr) = window_exprs
607            .iter()
608            .map(|e| e.get_reverse_expr())
609            .collect::<Option<Vec<_>>>()
610        {
611            reversed_window_expr
612        } else {
613            // Cannot take reverse of any of the window expr
614            // In this case, with existing ordering window cannot be run
615            return Ok(None);
616        }
617    } else {
618        window_exprs.to_vec()
619    };
620
621    // If all window expressions can run with bounded memory, choose the
622    // bounded window variant:
623    if window_expr.iter().all(|e| e.uses_bounded_memory()) {
624        Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
625            window_expr,
626            Arc::clone(input),
627            input_order_mode,
628            !physical_partition_keys.is_empty(),
629        )?) as _))
630    } else if input_order_mode != InputOrderMode::Sorted {
631        // For `WindowAggExec` to work correctly PARTITION BY columns should be sorted.
632        // Hence, if `input_order_mode` is not `Sorted` we should convert
633        // input ordering such that it can work with `Sorted` (add `SortExec`).
634        // Effectively `WindowAggExec` works only in `Sorted` mode.
635        Ok(None)
636    } else {
637        Ok(Some(Arc::new(WindowAggExec::try_new(
638            window_expr,
639            Arc::clone(input),
640            !physical_partition_keys.is_empty(),
641        )?) as _))
642    }
643}
644
645/// Compares physical ordering (output ordering of the `input` operator) with
646/// `partitionby_exprs` and `orderby_keys` to decide whether existing ordering
647/// is sufficient to run the current window operator.
648/// - A `None` return value indicates that we can not remove the sort in question
649///   (input ordering is not sufficient to run current window executor).
650/// - A `Some((bool, InputOrderMode))` value indicates that the window operator
651///   can run with existing input ordering, so we can remove `SortExec` before it.
652///
653/// The `bool` field in the return value represents whether we should reverse window
654/// operator to remove `SortExec` before it. The `InputOrderMode` field represents
655/// the mode this window operator should work in to accommodate the existing ordering.
656pub fn get_window_mode(
657    partitionby_exprs: &[Arc<dyn PhysicalExpr>],
658    orderby_keys: &[PhysicalSortExpr],
659    input: &Arc<dyn ExecutionPlan>,
660) -> Result<Option<(bool, InputOrderMode)>> {
661    let mut input_eqs = input.equivalence_properties().clone();
662    let (_, indices) = input_eqs.find_longest_permutation(partitionby_exprs)?;
663    let partition_by_reqs = indices
664        .iter()
665        .map(|&idx| PhysicalSortRequirement {
666            expr: Arc::clone(&partitionby_exprs[idx]),
667            options: None,
668        })
669        .collect::<Vec<_>>();
670    // Treat partition by exprs as constant. During analysis of requirements are satisfied.
671    let const_exprs = partitionby_exprs.iter().cloned().map(ConstExpr::from);
672    input_eqs.add_constants(const_exprs)?;
673    let reverse_orderby_keys =
674        orderby_keys.iter().map(|e| e.reverse()).collect::<Vec<_>>();
675    for (should_swap, orderbys) in
676        [(false, orderby_keys), (true, reverse_orderby_keys.as_ref())]
677    {
678        let mut req = partition_by_reqs.clone();
679        req.extend(orderbys.iter().cloned().map(Into::into));
680        if req.is_empty() || input_eqs.ordering_satisfy_requirement(req)? {
681            // Window can be run with existing ordering
682            let mode = if indices.len() == partitionby_exprs.len() {
683                InputOrderMode::Sorted
684            } else if indices.is_empty() {
685                InputOrderMode::Linear
686            } else {
687                InputOrderMode::PartiallySorted(indices)
688            };
689            return Ok(Some((should_swap, mode)));
690        }
691    }
692    Ok(None)
693}
694
695/// Generates sort option variations for a given expression.
696///
697/// This function is used to handle constant columns in window operations. Since constant
698/// columns can be considered as having any ordering, we generate multiple sort options
699/// to explore different ordering possibilities.
700///
701/// # Parameters
702/// - `expr`: The physical expression to generate sort options for
703/// - `only_monotonic`: If false, generates all 4 possible sort options (ASC/DESC × NULLS FIRST/LAST).
704///   If true, generates only 2 options that preserve set monotonicity.
705///
706/// # When to use `only_monotonic = false`:
707/// Use for PARTITION BY columns where we want to explore all possible orderings to find
708/// one that matches the existing data ordering.
709///
710/// # When to use `only_monotonic = true`:
711/// Use for aggregate/window function arguments where set monotonicity needs to be preserved.
712/// Only generates ASC NULLS LAST and DESC NULLS FIRST because:
713/// - Set monotonicity is broken if data has increasing order but nulls come first
714/// - Set monotonicity is broken if data has decreasing order but nulls come last
715fn sort_options_resolving_constant(
716    expr: Arc<dyn PhysicalExpr>,
717    only_monotonic: bool,
718) -> Vec<PhysicalSortExpr> {
719    if only_monotonic {
720        // Generate only the 2 options that preserve set monotonicity
721        vec![
722            PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), // ASC NULLS LAST
723            PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC NULLS FIRST
724        ]
725    } else {
726        // Generate all 4 possible sort options for partition columns
727        vec![
728            PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), // ASC NULLS LAST
729            PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, true)), // ASC NULLS FIRST
730            PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(true, false)), // DESC NULLS LAST
731            PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC NULLS FIRST
732        ]
733    }
734}
735
736#[cfg(test)]
737mod tests {
738    use super::*;
739    use crate::collect;
740    use crate::expressions::col;
741    use crate::streaming::StreamingTableExec;
742    use crate::test::assert_is_pending;
743    use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
744
745    use arrow::compute::SortOptions;
746    use arrow_schema::{DataType, Field};
747    use datafusion_execution::TaskContext;
748    use datafusion_functions_aggregate::count::count_udaf;
749    use InputOrderMode::{Linear, PartiallySorted, Sorted};
750
751    use futures::FutureExt;
752
753    fn create_test_schema() -> Result<SchemaRef> {
754        let nullable_column = Field::new("nullable_col", DataType::Int32, true);
755        let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
756        let schema = Arc::new(Schema::new(vec![nullable_column, non_nullable_column]));
757
758        Ok(schema)
759    }
760
761    fn create_test_schema2() -> Result<SchemaRef> {
762        let a = Field::new("a", DataType::Int32, true);
763        let b = Field::new("b", DataType::Int32, true);
764        let c = Field::new("c", DataType::Int32, true);
765        let d = Field::new("d", DataType::Int32, true);
766        let e = Field::new("e", DataType::Int32, true);
767        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
768        Ok(schema)
769    }
770
771    // Generate a schema which consists of 5 columns (a, b, c, d, e)
772    fn create_test_schema3() -> Result<SchemaRef> {
773        let a = Field::new("a", DataType::Int32, true);
774        let b = Field::new("b", DataType::Int32, false);
775        let c = Field::new("c", DataType::Int32, true);
776        let d = Field::new("d", DataType::Int32, false);
777        let e = Field::new("e", DataType::Int32, false);
778        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
779        Ok(schema)
780    }
781
782    /// make PhysicalSortExpr with default options
783    pub fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
784        sort_expr_options(name, schema, SortOptions::default())
785    }
786
787    /// PhysicalSortExpr with specified options
788    pub fn sort_expr_options(
789        name: &str,
790        schema: &Schema,
791        options: SortOptions,
792    ) -> PhysicalSortExpr {
793        PhysicalSortExpr {
794            expr: col(name, schema).unwrap(),
795            options,
796        }
797    }
798
799    /// Created a sorted Streaming Table exec
800    pub fn streaming_table_exec(
801        schema: &SchemaRef,
802        ordering: LexOrdering,
803        infinite_source: bool,
804    ) -> Result<Arc<dyn ExecutionPlan>> {
805        Ok(Arc::new(StreamingTableExec::try_new(
806            Arc::clone(schema),
807            vec![],
808            None,
809            Some(ordering),
810            infinite_source,
811            None,
812        )?))
813    }
814
815    #[tokio::test]
816    async fn test_calc_requirements() -> Result<()> {
817        let schema = create_test_schema2()?;
818        let test_data = vec![
819            // PARTITION BY a, ORDER BY b ASC NULLS FIRST
820            (
821                vec!["a"],
822                vec![("b", true, true)],
823                vec![
824                    vec![("a", None), ("b", Some((true, true)))],
825                    vec![("b", Some((true, true)))],
826                ],
827            ),
828            // PARTITION BY a, ORDER BY a ASC NULLS FIRST
829            (
830                vec!["a"],
831                vec![("a", true, true)],
832                vec![vec![("a", None)], vec![("a", Some((true, true)))]],
833            ),
834            // PARTITION BY a, ORDER BY b ASC NULLS FIRST, c DESC NULLS LAST
835            (
836                vec!["a"],
837                vec![("b", true, true), ("c", false, false)],
838                vec![
839                    vec![
840                        ("a", None),
841                        ("b", Some((true, true))),
842                        ("c", Some((false, false))),
843                    ],
844                    vec![("b", Some((true, true))), ("c", Some((false, false)))],
845                ],
846            ),
847            // PARTITION BY a, c, ORDER BY b ASC NULLS FIRST, c DESC NULLS LAST
848            (
849                vec!["a", "c"],
850                vec![("b", true, true), ("c", false, false)],
851                vec![
852                    vec![("a", None), ("c", None), ("b", Some((true, true)))],
853                    vec![("b", Some((true, true))), ("c", Some((false, false)))],
854                ],
855            ),
856        ];
857        for (pb_params, ob_params, expected_params) in test_data {
858            let mut partitionbys = vec![];
859            for col_name in pb_params {
860                partitionbys.push(col(col_name, &schema)?);
861            }
862
863            let mut orderbys = vec![];
864            for (col_name, descending, nulls_first) in ob_params {
865                let expr = col(col_name, &schema)?;
866                let options = SortOptions::new(descending, nulls_first);
867                orderbys.push(PhysicalSortExpr::new(expr, options));
868            }
869
870            let mut expected: Option<OrderingRequirements> = None;
871            for expected_param in expected_params.clone() {
872                let mut requirements = vec![];
873                for (col_name, reqs) in expected_param {
874                    let options = reqs.map(|(descending, nulls_first)| {
875                        SortOptions::new(descending, nulls_first)
876                    });
877                    let expr = col(col_name, &schema)?;
878                    requirements.push(PhysicalSortRequirement::new(expr, options));
879                }
880                if let Some(requirements) = LexRequirement::new(requirements) {
881                    if let Some(alts) = expected.as_mut() {
882                        alts.add_alternative(requirements);
883                    } else {
884                        expected = Some(OrderingRequirements::new(requirements));
885                    }
886                }
887            }
888            assert_eq!(calc_requirements(partitionbys, orderbys), expected);
889        }
890        Ok(())
891    }
892
893    #[tokio::test]
894    async fn test_drop_cancel() -> Result<()> {
895        let task_ctx = Arc::new(TaskContext::default());
896        let schema =
897            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
898
899        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
900        let refs = blocking_exec.refs();
901        let window_agg_exec = Arc::new(WindowAggExec::try_new(
902            vec![create_window_expr(
903                &WindowFunctionDefinition::AggregateUDF(count_udaf()),
904                "count".to_owned(),
905                &[col("a", &schema)?],
906                &[],
907                &[],
908                Arc::new(WindowFrame::new(None)),
909                schema,
910                false,
911                false,
912                None,
913            )?],
914            blocking_exec,
915            false,
916        )?);
917
918        let fut = collect(window_agg_exec, task_ctx);
919        let mut fut = fut.boxed();
920
921        assert_is_pending(&mut fut);
922        drop(fut);
923        assert_strong_count_converges_to_zero(refs).await;
924
925        Ok(())
926    }
927
928    #[tokio::test]
929    async fn test_satisfy_nullable() -> Result<()> {
930        let schema = create_test_schema()?;
931        let params = vec![
932            ((true, true), (false, false), false),
933            ((true, true), (false, true), false),
934            ((true, true), (true, false), false),
935            ((true, false), (false, true), false),
936            ((true, false), (false, false), false),
937            ((true, false), (true, true), false),
938            ((true, false), (true, false), true),
939        ];
940        for (
941            (physical_desc, physical_nulls_first),
942            (req_desc, req_nulls_first),
943            expected,
944        ) in params
945        {
946            let physical_ordering = PhysicalSortExpr {
947                expr: col("nullable_col", &schema)?,
948                options: SortOptions {
949                    descending: physical_desc,
950                    nulls_first: physical_nulls_first,
951                },
952            };
953            let required_ordering = PhysicalSortExpr {
954                expr: col("nullable_col", &schema)?,
955                options: SortOptions {
956                    descending: req_desc,
957                    nulls_first: req_nulls_first,
958                },
959            };
960            let res = physical_ordering.satisfy(&required_ordering.into(), &schema);
961            assert_eq!(res, expected);
962        }
963
964        Ok(())
965    }
966
967    #[tokio::test]
968    async fn test_satisfy_non_nullable() -> Result<()> {
969        let schema = create_test_schema()?;
970
971        let params = vec![
972            ((true, true), (false, false), false),
973            ((true, true), (false, true), false),
974            ((true, true), (true, false), true),
975            ((true, false), (false, true), false),
976            ((true, false), (false, false), false),
977            ((true, false), (true, true), true),
978            ((true, false), (true, false), true),
979        ];
980        for (
981            (physical_desc, physical_nulls_first),
982            (req_desc, req_nulls_first),
983            expected,
984        ) in params
985        {
986            let physical_ordering = PhysicalSortExpr {
987                expr: col("non_nullable_col", &schema)?,
988                options: SortOptions {
989                    descending: physical_desc,
990                    nulls_first: physical_nulls_first,
991                },
992            };
993            let required_ordering = PhysicalSortExpr {
994                expr: col("non_nullable_col", &schema)?,
995                options: SortOptions {
996                    descending: req_desc,
997                    nulls_first: req_nulls_first,
998                },
999            };
1000            let res = physical_ordering.satisfy(&required_ordering.into(), &schema);
1001            assert_eq!(res, expected);
1002        }
1003
1004        Ok(())
1005    }
1006
1007    #[tokio::test]
1008    async fn test_get_window_mode_exhaustive() -> Result<()> {
1009        let test_schema = create_test_schema3()?;
1010        // Columns a,c are nullable whereas b,d are not nullable.
1011        // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC NULLS FIRST, d ASC NULLS FIRST
1012        // Column e is not ordered.
1013        let ordering = [
1014            sort_expr("a", &test_schema),
1015            sort_expr("b", &test_schema),
1016            sort_expr("c", &test_schema),
1017            sort_expr("d", &test_schema),
1018        ]
1019        .into();
1020        let exec_unbounded = streaming_table_exec(&test_schema, ordering, true)?;
1021
1022        // test cases consists of vector of tuples. Where each tuple represents a single test case.
1023        // First field in the tuple is Vec<str> where each element in the vector represents PARTITION BY columns
1024        // For instance `vec!["a", "b"]` corresponds to PARTITION BY a, b
1025        // Second field in the tuple is Vec<str> where each element in the vector represents ORDER BY columns
1026        // For instance, vec!["c"], corresponds to ORDER BY c ASC NULLS FIRST, (ordering is default ordering. We do not check
1027        // for reversibility in this test).
1028        // Third field in the tuple is Option<InputOrderMode>, which corresponds to expected algorithm mode.
1029        // None represents that existing ordering is not sufficient to run executor with any one of the algorithms
1030        // (We need to add SortExec to be able to run it).
1031        // Some(InputOrderMode) represents, we can run algorithm with existing ordering; and algorithm should work in
1032        // InputOrderMode.
1033        let test_cases = vec![
1034            (vec!["a"], vec!["a"], Some(Sorted)),
1035            (vec!["a"], vec!["b"], Some(Sorted)),
1036            (vec!["a"], vec!["c"], None),
1037            (vec!["a"], vec!["a", "b"], Some(Sorted)),
1038            (vec!["a"], vec!["b", "c"], Some(Sorted)),
1039            (vec!["a"], vec!["a", "c"], None),
1040            (vec!["a"], vec!["a", "b", "c"], Some(Sorted)),
1041            (vec!["b"], vec!["a"], Some(Linear)),
1042            (vec!["b"], vec!["b"], Some(Linear)),
1043            (vec!["b"], vec!["c"], None),
1044            (vec!["b"], vec!["a", "b"], Some(Linear)),
1045            (vec!["b"], vec!["b", "c"], None),
1046            (vec!["b"], vec!["a", "c"], Some(Linear)),
1047            (vec!["b"], vec!["a", "b", "c"], Some(Linear)),
1048            (vec!["c"], vec!["a"], Some(Linear)),
1049            (vec!["c"], vec!["b"], None),
1050            (vec!["c"], vec!["c"], Some(Linear)),
1051            (vec!["c"], vec!["a", "b"], Some(Linear)),
1052            (vec!["c"], vec!["b", "c"], None),
1053            (vec!["c"], vec!["a", "c"], Some(Linear)),
1054            (vec!["c"], vec!["a", "b", "c"], Some(Linear)),
1055            (vec!["b", "a"], vec!["a"], Some(Sorted)),
1056            (vec!["b", "a"], vec!["b"], Some(Sorted)),
1057            (vec!["b", "a"], vec!["c"], Some(Sorted)),
1058            (vec!["b", "a"], vec!["a", "b"], Some(Sorted)),
1059            (vec!["b", "a"], vec!["b", "c"], Some(Sorted)),
1060            (vec!["b", "a"], vec!["a", "c"], Some(Sorted)),
1061            (vec!["b", "a"], vec!["a", "b", "c"], Some(Sorted)),
1062            (vec!["c", "b"], vec!["a"], Some(Linear)),
1063            (vec!["c", "b"], vec!["b"], Some(Linear)),
1064            (vec!["c", "b"], vec!["c"], Some(Linear)),
1065            (vec!["c", "b"], vec!["a", "b"], Some(Linear)),
1066            (vec!["c", "b"], vec!["b", "c"], Some(Linear)),
1067            (vec!["c", "b"], vec!["a", "c"], Some(Linear)),
1068            (vec!["c", "b"], vec!["a", "b", "c"], Some(Linear)),
1069            (vec!["c", "a"], vec!["a"], Some(PartiallySorted(vec![1]))),
1070            (vec!["c", "a"], vec!["b"], Some(PartiallySorted(vec![1]))),
1071            (vec!["c", "a"], vec!["c"], Some(PartiallySorted(vec![1]))),
1072            (
1073                vec!["c", "a"],
1074                vec!["a", "b"],
1075                Some(PartiallySorted(vec![1])),
1076            ),
1077            (
1078                vec!["c", "a"],
1079                vec!["b", "c"],
1080                Some(PartiallySorted(vec![1])),
1081            ),
1082            (
1083                vec!["c", "a"],
1084                vec!["a", "c"],
1085                Some(PartiallySorted(vec![1])),
1086            ),
1087            (
1088                vec!["c", "a"],
1089                vec!["a", "b", "c"],
1090                Some(PartiallySorted(vec![1])),
1091            ),
1092            (vec!["c", "b", "a"], vec!["a"], Some(Sorted)),
1093            (vec!["c", "b", "a"], vec!["b"], Some(Sorted)),
1094            (vec!["c", "b", "a"], vec!["c"], Some(Sorted)),
1095            (vec!["c", "b", "a"], vec!["a", "b"], Some(Sorted)),
1096            (vec!["c", "b", "a"], vec!["b", "c"], Some(Sorted)),
1097            (vec!["c", "b", "a"], vec!["a", "c"], Some(Sorted)),
1098            (vec!["c", "b", "a"], vec!["a", "b", "c"], Some(Sorted)),
1099        ];
1100        for (case_idx, test_case) in test_cases.iter().enumerate() {
1101            let (partition_by_columns, order_by_params, expected) = &test_case;
1102            let mut partition_by_exprs = vec![];
1103            for col_name in partition_by_columns {
1104                partition_by_exprs.push(col(col_name, &test_schema)?);
1105            }
1106
1107            let mut order_by_exprs = vec![];
1108            for col_name in order_by_params {
1109                let expr = col(col_name, &test_schema)?;
1110                // Give default ordering, this is same with input ordering direction
1111                // In this test we do check for reversibility.
1112                let options = SortOptions::default();
1113                order_by_exprs.push(PhysicalSortExpr { expr, options });
1114            }
1115            let res =
1116                get_window_mode(&partition_by_exprs, &order_by_exprs, &exec_unbounded)?;
1117            // Since reversibility is not important in this test. Convert Option<(bool, InputOrderMode)> to Option<InputOrderMode>
1118            let res = res.map(|(_, mode)| mode);
1119            assert_eq!(
1120                res, *expected,
1121                "Unexpected result for in unbounded test case#: {case_idx:?}, case: {test_case:?}"
1122            );
1123        }
1124
1125        Ok(())
1126    }
1127
1128    #[tokio::test]
1129    async fn test_get_window_mode() -> Result<()> {
1130        let test_schema = create_test_schema3()?;
1131        // Columns a,c are nullable whereas b,d are not nullable.
1132        // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC NULLS FIRST, d ASC NULLS FIRST
1133        // Column e is not ordered.
1134        let ordering = [
1135            sort_expr("a", &test_schema),
1136            sort_expr("b", &test_schema),
1137            sort_expr("c", &test_schema),
1138            sort_expr("d", &test_schema),
1139        ]
1140        .into();
1141        let exec_unbounded = streaming_table_exec(&test_schema, ordering, true)?;
1142
1143        // test cases consists of vector of tuples. Where each tuple represents a single test case.
1144        // First field in the tuple is Vec<str> where each element in the vector represents PARTITION BY columns
1145        // For instance `vec!["a", "b"]` corresponds to PARTITION BY a, b
1146        // Second field in the tuple is Vec<(str, bool, bool)> where each element in the vector represents ORDER BY columns
1147        // For instance, vec![("c", false, false)], corresponds to ORDER BY c ASC NULLS LAST,
1148        // similarly, vec![("c", true, true)], corresponds to ORDER BY c DESC NULLS FIRST,
1149        // Third field in the tuple is Option<(bool, InputOrderMode)>, which corresponds to expected result.
1150        // None represents that existing ordering is not sufficient to run executor with any one of the algorithms
1151        // (We need to add SortExec to be able to run it).
1152        // Some((bool, InputOrderMode)) represents, we can run algorithm with existing ordering. Algorithm should work in
1153        // InputOrderMode, bool field represents whether we should reverse window expressions to run executor with existing ordering.
1154        // For instance, `Some((false, InputOrderMode::Sorted))`, represents that we shouldn't reverse window expressions. And algorithm
1155        // should work in Sorted mode to work with existing ordering.
1156        let test_cases = vec![
1157            // PARTITION BY a, b ORDER BY c ASC NULLS LAST
1158            (vec!["a", "b"], vec![("c", false, false)], None),
1159            // ORDER BY c ASC NULLS FIRST
1160            (vec![], vec![("c", false, true)], None),
1161            // PARTITION BY b, ORDER BY c ASC NULLS FIRST
1162            (vec!["b"], vec![("c", false, true)], None),
1163            // PARTITION BY a, ORDER BY c ASC NULLS FIRST
1164            (vec!["a"], vec![("c", false, true)], None),
1165            // PARTITION BY b, ORDER BY c ASC NULLS FIRST
1166            (
1167                vec!["a", "b"],
1168                vec![("c", false, true), ("e", false, true)],
1169                None,
1170            ),
1171            // PARTITION BY a, ORDER BY b ASC NULLS FIRST
1172            (vec!["a"], vec![("b", false, true)], Some((false, Sorted))),
1173            // PARTITION BY a, ORDER BY a ASC NULLS FIRST
1174            (vec!["a"], vec![("a", false, true)], Some((false, Sorted))),
1175            // PARTITION BY a, ORDER BY a ASC NULLS LAST
1176            (vec!["a"], vec![("a", false, false)], Some((false, Sorted))),
1177            // PARTITION BY a, ORDER BY a DESC NULLS FIRST
1178            (vec!["a"], vec![("a", true, true)], Some((false, Sorted))),
1179            // PARTITION BY a, ORDER BY a DESC NULLS LAST
1180            (vec!["a"], vec![("a", true, false)], Some((false, Sorted))),
1181            // PARTITION BY a, ORDER BY b ASC NULLS LAST
1182            (vec!["a"], vec![("b", false, false)], Some((false, Sorted))),
1183            // PARTITION BY a, ORDER BY b DESC NULLS LAST
1184            (vec!["a"], vec![("b", true, false)], Some((true, Sorted))),
1185            // PARTITION BY a, b ORDER BY c ASC NULLS FIRST
1186            (
1187                vec!["a", "b"],
1188                vec![("c", false, true)],
1189                Some((false, Sorted)),
1190            ),
1191            // PARTITION BY b, a ORDER BY c ASC NULLS FIRST
1192            (
1193                vec!["b", "a"],
1194                vec![("c", false, true)],
1195                Some((false, Sorted)),
1196            ),
1197            // PARTITION BY a, b ORDER BY c DESC NULLS LAST
1198            (
1199                vec!["a", "b"],
1200                vec![("c", true, false)],
1201                Some((true, Sorted)),
1202            ),
1203            // PARTITION BY e ORDER BY a ASC NULLS FIRST
1204            (
1205                vec!["e"],
1206                vec![("a", false, true)],
1207                // For unbounded, expects to work in Linear mode. Shouldn't reverse window function.
1208                Some((false, Linear)),
1209            ),
1210            // PARTITION BY b, c ORDER BY a ASC NULLS FIRST, c ASC NULLS FIRST
1211            (
1212                vec!["b", "c"],
1213                vec![("a", false, true), ("c", false, true)],
1214                Some((false, Linear)),
1215            ),
1216            // PARTITION BY b ORDER BY a ASC NULLS FIRST
1217            (vec!["b"], vec![("a", false, true)], Some((false, Linear))),
1218            // PARTITION BY a, e ORDER BY b ASC NULLS FIRST
1219            (
1220                vec!["a", "e"],
1221                vec![("b", false, true)],
1222                Some((false, PartiallySorted(vec![0]))),
1223            ),
1224            // PARTITION BY a, c ORDER BY b ASC NULLS FIRST
1225            (
1226                vec!["a", "c"],
1227                vec![("b", false, true)],
1228                Some((false, PartiallySorted(vec![0]))),
1229            ),
1230            // PARTITION BY c, a ORDER BY b ASC NULLS FIRST
1231            (
1232                vec!["c", "a"],
1233                vec![("b", false, true)],
1234                Some((false, PartiallySorted(vec![1]))),
1235            ),
1236            // PARTITION BY d, b, a ORDER BY c ASC NULLS FIRST
1237            (
1238                vec!["d", "b", "a"],
1239                vec![("c", false, true)],
1240                Some((false, PartiallySorted(vec![2, 1]))),
1241            ),
1242            // PARTITION BY e, b, a ORDER BY c ASC NULLS FIRST
1243            (
1244                vec!["e", "b", "a"],
1245                vec![("c", false, true)],
1246                Some((false, PartiallySorted(vec![2, 1]))),
1247            ),
1248            // PARTITION BY d, a ORDER BY b ASC NULLS FIRST
1249            (
1250                vec!["d", "a"],
1251                vec![("b", false, true)],
1252                Some((false, PartiallySorted(vec![1]))),
1253            ),
1254            // PARTITION BY b, ORDER BY b, a ASC NULLS FIRST
1255            (
1256                vec!["a"],
1257                vec![("b", false, true), ("a", false, true)],
1258                Some((false, Sorted)),
1259            ),
1260            // ORDER BY b, a ASC NULLS FIRST
1261            (vec![], vec![("b", false, true), ("a", false, true)], None),
1262        ];
1263        for (case_idx, test_case) in test_cases.iter().enumerate() {
1264            let (partition_by_columns, order_by_params, expected) = &test_case;
1265            let mut partition_by_exprs = vec![];
1266            for col_name in partition_by_columns {
1267                partition_by_exprs.push(col(col_name, &test_schema)?);
1268            }
1269
1270            let mut order_by_exprs = vec![];
1271            for (col_name, descending, nulls_first) in order_by_params {
1272                let expr = col(col_name, &test_schema)?;
1273                let options = SortOptions {
1274                    descending: *descending,
1275                    nulls_first: *nulls_first,
1276                };
1277                order_by_exprs.push(PhysicalSortExpr { expr, options });
1278            }
1279
1280            assert_eq!(
1281                get_window_mode(&partition_by_exprs, &order_by_exprs, &exec_unbounded)?,
1282                *expected,
1283                "Unexpected result for in unbounded test case#: {case_idx:?}, case: {test_case:?}"
1284            );
1285        }
1286
1287        Ok(())
1288    }
1289}