1mod bounded_window_agg_exec;
21mod utils;
22mod window_agg_exec;
23
24use std::borrow::Borrow;
25use std::sync::Arc;
26
27use crate::{
28 expressions::PhysicalSortExpr, ExecutionPlan, ExecutionPlanProperties,
29 InputOrderMode, PhysicalExpr,
30};
31
32use arrow::datatypes::{Schema, SchemaRef};
33use arrow_schema::{FieldRef, SortOptions};
34use datafusion_common::{exec_err, Result};
35use datafusion_expr::{
36 LimitEffect, PartitionEvaluator, ReversedUDWF, SetMonotonicity, WindowFrame,
37 WindowFunctionDefinition, WindowUDF,
38};
39use datafusion_functions_window_common::expr::ExpressionArgs;
40use datafusion_functions_window_common::field::WindowUDFFieldArgs;
41use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
42use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
43use datafusion_physical_expr::expressions::Column;
44use datafusion_physical_expr::window::{
45 SlidingAggregateWindowExpr, StandardWindowFunctionExpr,
46};
47use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
48use datafusion_physical_expr_common::sort_expr::{
49 LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortRequirement,
50};
51
52use itertools::Itertools;
53
54pub use bounded_window_agg_exec::BoundedWindowAggExec;
56pub use datafusion_physical_expr::window::{
57 PlainAggregateWindowExpr, StandardWindowExpr, WindowExpr,
58};
59pub use window_agg_exec::WindowAggExec;
60
61pub fn schema_add_window_field(
63 args: &[Arc<dyn PhysicalExpr>],
64 schema: &Schema,
65 window_fn: &WindowFunctionDefinition,
66 fn_name: &str,
67) -> Result<Arc<Schema>> {
68 let fields = args
69 .iter()
70 .map(|e| Arc::clone(e).as_ref().return_field(schema))
71 .collect::<Result<Vec<_>>>()?;
72 let window_expr_return_field = window_fn.return_field(&fields, fn_name)?;
73 let mut window_fields = schema
74 .fields()
75 .iter()
76 .map(|f| f.as_ref().clone())
77 .collect_vec();
78 if let WindowFunctionDefinition::AggregateUDF(_) = window_fn {
80 Ok(Arc::new(Schema::new(window_fields)))
81 } else {
82 window_fields.extend_from_slice(&[window_expr_return_field
83 .as_ref()
84 .clone()
85 .with_name(fn_name)]);
86 Ok(Arc::new(Schema::new(window_fields)))
87 }
88}
89
90#[allow(clippy::too_many_arguments)]
92pub fn create_window_expr(
93 fun: &WindowFunctionDefinition,
94 name: String,
95 args: &[Arc<dyn PhysicalExpr>],
96 partition_by: &[Arc<dyn PhysicalExpr>],
97 order_by: &[PhysicalSortExpr],
98 window_frame: Arc<WindowFrame>,
99 input_schema: SchemaRef,
100 ignore_nulls: bool,
101 distinct: bool,
102 filter: Option<Arc<dyn PhysicalExpr>>,
103) -> Result<Arc<dyn WindowExpr>> {
104 Ok(match fun {
105 WindowFunctionDefinition::AggregateUDF(fun) => {
106 let aggregate = if distinct {
107 AggregateExprBuilder::new(Arc::clone(fun), args.to_vec())
108 .schema(input_schema)
109 .alias(name)
110 .with_ignore_nulls(ignore_nulls)
111 .distinct()
112 .build()
113 .map(Arc::new)?
114 } else {
115 AggregateExprBuilder::new(Arc::clone(fun), args.to_vec())
116 .schema(input_schema)
117 .alias(name)
118 .with_ignore_nulls(ignore_nulls)
119 .build()
120 .map(Arc::new)?
121 };
122 window_expr_from_aggregate_expr(
123 partition_by,
124 order_by,
125 window_frame,
126 aggregate,
127 filter,
128 )
129 }
130 WindowFunctionDefinition::WindowUDF(fun) => Arc::new(StandardWindowExpr::new(
131 create_udwf_window_expr(fun, args, &input_schema, name, ignore_nulls)?,
132 partition_by,
133 order_by,
134 window_frame,
135 )),
136 })
137}
138
139fn window_expr_from_aggregate_expr(
141 partition_by: &[Arc<dyn PhysicalExpr>],
142 order_by: &[PhysicalSortExpr],
143 window_frame: Arc<WindowFrame>,
144 aggregate: Arc<AggregateFunctionExpr>,
145 filter: Option<Arc<dyn PhysicalExpr>>,
146) -> Arc<dyn WindowExpr> {
147 let unbounded_window = window_frame.is_ever_expanding();
149
150 if !unbounded_window {
151 Arc::new(SlidingAggregateWindowExpr::new(
152 aggregate,
153 partition_by,
154 order_by,
155 window_frame,
156 filter,
157 ))
158 } else {
159 Arc::new(PlainAggregateWindowExpr::new(
160 aggregate,
161 partition_by,
162 order_by,
163 window_frame,
164 filter,
165 ))
166 }
167}
168
169pub fn create_udwf_window_expr(
171 fun: &Arc<WindowUDF>,
172 args: &[Arc<dyn PhysicalExpr>],
173 input_schema: &Schema,
174 name: String,
175 ignore_nulls: bool,
176) -> Result<Arc<dyn StandardWindowFunctionExpr>> {
177 let input_fields: Vec<_> = args
179 .iter()
180 .map(|arg| arg.return_field(input_schema))
181 .collect::<Result<_>>()?;
182
183 let udwf_expr = Arc::new(WindowUDFExpr {
184 fun: Arc::clone(fun),
185 args: args.to_vec(),
186 input_fields,
187 name,
188 is_reversed: false,
189 ignore_nulls,
190 });
191
192 let _ = udwf_expr.create_evaluator()?;
204
205 Ok(udwf_expr)
206}
207
208#[derive(Clone, Debug)]
210pub struct WindowUDFExpr {
211 fun: Arc<WindowUDF>,
212 args: Vec<Arc<dyn PhysicalExpr>>,
213 name: String,
215 input_fields: Vec<FieldRef>,
217 is_reversed: bool,
221 ignore_nulls: bool,
223}
224
225impl WindowUDFExpr {
226 pub fn fun(&self) -> &Arc<WindowUDF> {
227 &self.fun
228 }
229}
230
231impl StandardWindowFunctionExpr for WindowUDFExpr {
232 fn as_any(&self) -> &dyn std::any::Any {
233 self
234 }
235
236 fn field(&self) -> Result<FieldRef> {
237 self.fun
238 .field(WindowUDFFieldArgs::new(&self.input_fields, &self.name))
239 }
240
241 fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
242 self.fun
243 .expressions(ExpressionArgs::new(&self.args, &self.input_fields))
244 }
245
246 fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
247 self.fun
248 .partition_evaluator_factory(PartitionEvaluatorArgs::new(
249 &self.args,
250 &self.input_fields,
251 self.is_reversed,
252 self.ignore_nulls,
253 ))
254 }
255
256 fn name(&self) -> &str {
257 &self.name
258 }
259
260 fn reverse_expr(&self) -> Option<Arc<dyn StandardWindowFunctionExpr>> {
261 match self.fun.reverse_expr() {
262 ReversedUDWF::Identical => Some(Arc::new(self.clone())),
263 ReversedUDWF::NotSupported => None,
264 ReversedUDWF::Reversed(fun) => Some(Arc::new(WindowUDFExpr {
265 fun,
266 args: self.args.clone(),
267 name: self.name.clone(),
268 input_fields: self.input_fields.clone(),
269 is_reversed: !self.is_reversed,
270 ignore_nulls: self.ignore_nulls,
271 })),
272 }
273 }
274
275 fn get_result_ordering(&self, schema: &SchemaRef) -> Option<PhysicalSortExpr> {
276 self.fun
277 .sort_options()
278 .zip(schema.column_with_name(self.name()))
279 .map(|(options, (idx, field))| {
280 let expr = Arc::new(Column::new(field.name(), idx));
281 PhysicalSortExpr { expr, options }
282 })
283 }
284
285 fn limit_effect(&self) -> LimitEffect {
286 self.fun.inner().limit_effect(self.args.as_slice())
287 }
288}
289
290pub(crate) fn calc_requirements<
291 T: Borrow<Arc<dyn PhysicalExpr>>,
292 S: Borrow<PhysicalSortExpr>,
293>(
294 partition_by_exprs: impl IntoIterator<Item = T>,
295 orderby_sort_exprs: impl IntoIterator<Item = S>,
296) -> Option<OrderingRequirements> {
297 let mut sort_reqs_with_partition = partition_by_exprs
298 .into_iter()
299 .map(|partition_by| {
300 PhysicalSortRequirement::new(Arc::clone(partition_by.borrow()), None)
301 })
302 .collect::<Vec<_>>();
303 let mut sort_reqs = vec![];
304 for element in orderby_sort_exprs.into_iter() {
305 let PhysicalSortExpr { expr, options } = element.borrow();
306 let sort_req = PhysicalSortRequirement::new(Arc::clone(expr), Some(*options));
307 if !sort_reqs_with_partition.iter().any(|e| e.expr.eq(expr)) {
308 sort_reqs_with_partition.push(sort_req.clone());
309 }
310 if !sort_reqs
311 .iter()
312 .any(|e: &PhysicalSortRequirement| e.expr.eq(expr))
313 {
314 sort_reqs.push(sort_req);
315 }
316 }
317
318 let mut alternatives = vec![];
319 alternatives.extend(LexRequirement::new(sort_reqs_with_partition));
320 alternatives.extend(LexRequirement::new(sort_reqs));
321
322 OrderingRequirements::new_alternatives(alternatives, false)
323}
324
325pub fn get_ordered_partition_by_indices(
331 partition_by_exprs: &[Arc<dyn PhysicalExpr>],
332 input: &Arc<dyn ExecutionPlan>,
333) -> Result<Vec<usize>> {
334 let (_, indices) = input
335 .equivalence_properties()
336 .find_longest_permutation(partition_by_exprs)?;
337 Ok(indices)
338}
339
340pub(crate) fn get_partition_by_sort_exprs(
341 input: &Arc<dyn ExecutionPlan>,
342 partition_by_exprs: &[Arc<dyn PhysicalExpr>],
343 ordered_partition_by_indices: &[usize],
344) -> Result<Vec<PhysicalSortExpr>> {
345 let ordered_partition_exprs = ordered_partition_by_indices
346 .iter()
347 .map(|idx| Arc::clone(&partition_by_exprs[*idx]))
348 .collect::<Vec<_>>();
349 assert!(ordered_partition_by_indices.len() <= partition_by_exprs.len());
351 let (ordering, _) = input
352 .equivalence_properties()
353 .find_longest_permutation(&ordered_partition_exprs)?;
354 if ordering.len() == ordered_partition_exprs.len() {
355 Ok(ordering)
356 } else {
357 exec_err!("Expects PARTITION BY expression to be ordered")
358 }
359}
360
361pub(crate) fn window_equivalence_properties(
362 schema: &SchemaRef,
363 input: &Arc<dyn ExecutionPlan>,
364 window_exprs: &[Arc<dyn WindowExpr>],
365) -> Result<EquivalenceProperties> {
366 let mut window_eq_properties = EquivalenceProperties::new(Arc::clone(schema))
369 .extend(input.equivalence_properties().clone())?;
370
371 let window_schema_len = schema.fields.len();
372 let input_schema_len = window_schema_len - window_exprs.len();
373 let window_expr_indices = (input_schema_len..window_schema_len).collect::<Vec<_>>();
374
375 for (i, expr) in window_exprs.iter().enumerate() {
376 let partitioning_exprs = expr.partition_by();
377 let no_partitioning = partitioning_exprs.is_empty();
378
379 let mut all_satisfied_lexs = vec![];
382 let mut candidate_ordering = vec![];
383
384 for partition_expr in partitioning_exprs.iter() {
385 let sort_options =
386 sort_options_resolving_constant(Arc::clone(partition_expr), true);
387
388 let mut found = false;
390 for sort_expr in sort_options.into_iter() {
391 candidate_ordering.push(sort_expr);
392 if let Some(lex) = LexOrdering::new(candidate_ordering.clone()) {
393 if window_eq_properties.ordering_satisfy(lex)? {
394 found = true;
395 break;
396 }
397 }
398 candidate_ordering.pop();
400 }
401 if !found {
403 candidate_ordering.clear();
404 break;
405 }
406 }
407
408 if candidate_ordering.len() == partitioning_exprs.len() {
411 if let Some(lex) = LexOrdering::new(candidate_ordering) {
412 all_satisfied_lexs.push(lex);
413 }
414 }
415 if !no_partitioning && all_satisfied_lexs.is_empty() {
419 return Ok(window_eq_properties);
420 } else if let Some(std_expr) = expr.as_any().downcast_ref::<StandardWindowExpr>()
421 {
422 std_expr.add_equal_orderings(&mut window_eq_properties)?;
423 } else if let Some(plain_expr) =
424 expr.as_any().downcast_ref::<PlainAggregateWindowExpr>()
425 {
426 if plain_expr.get_window_frame().end_bound.is_unbounded() {
430 let window_col =
431 Arc::new(Column::new(expr.name(), i + input_schema_len)) as _;
432 if no_partitioning {
433 window_eq_properties
435 .add_constants(std::iter::once(ConstExpr::from(window_col)))?
436 } else {
437 let new_lexs = all_satisfied_lexs.into_iter().flat_map(|lex| {
440 let new_partial_consts = sort_options_resolving_constant(
441 Arc::clone(&window_col),
442 false,
443 );
444
445 new_partial_consts.into_iter().map(move |partial| {
446 let mut existing = lex.clone();
447 existing.push(partial);
448 existing
449 })
450 });
451 window_eq_properties.add_orderings(new_lexs);
452 }
453 } else {
454 plain_expr.add_equal_orderings(
457 &mut window_eq_properties,
458 window_expr_indices[i],
459 )?;
460 }
461 } else if let Some(sliding_expr) =
462 expr.as_any().downcast_ref::<SlidingAggregateWindowExpr>()
463 {
464 let set_monotonicity = sliding_expr.get_aggregate_expr().set_monotonicity();
468 if set_monotonicity.ne(&SetMonotonicity::NotMonotonic) {
469 let frame = sliding_expr.get_window_frame();
472 if frame.end_bound.is_unbounded() {
473 let increasing = set_monotonicity.eq(&SetMonotonicity::Increasing);
474 let window_col = Column::new(expr.name(), i + input_schema_len);
475 if no_partitioning {
476 window_eq_properties.add_ordering([PhysicalSortExpr::new(
478 Arc::new(window_col),
479 SortOptions::new(increasing, true),
480 )]);
481 } else {
482 for mut lex in all_satisfied_lexs.into_iter() {
484 lex.push(PhysicalSortExpr::new(
485 Arc::new(window_col.clone()),
486 SortOptions::new(increasing, true),
487 ));
488 window_eq_properties.add_ordering(lex);
489 }
490 }
491 }
492 else if frame.is_causal() {
499 let aggregate_exprs = sliding_expr.get_aggregate_expr().expressions();
502 let mut candidate_order = vec![];
503 let mut asc = false;
504
505 for (idx, expr) in aggregate_exprs.iter().enumerate() {
506 let mut found = false;
507 let sort_options =
508 sort_options_resolving_constant(Arc::clone(expr), false);
509
510 for sort_expr in sort_options.into_iter() {
512 let is_asc = !sort_expr.options.descending;
513 candidate_order.push(sort_expr);
514
515 if let Some(lex) = LexOrdering::new(candidate_order.clone()) {
516 if window_eq_properties.ordering_satisfy(lex)? {
517 if idx == 0 {
518 asc = is_asc;
526 }
527 found = true;
528 break;
529 }
530 }
531 candidate_order.pop();
533 }
534
535 if !found {
537 break;
538 }
539 }
540
541 let satisfied = candidate_order.len() == aggregate_exprs.len()
543 && !aggregate_exprs.is_empty();
544
545 if satisfied {
546 let increasing =
547 set_monotonicity.eq(&SetMonotonicity::Increasing);
548 let window_col = Column::new(expr.name(), i + input_schema_len);
549 if increasing && (asc || no_partitioning) {
550 window_eq_properties.add_ordering([PhysicalSortExpr::new(
551 Arc::new(window_col),
552 SortOptions::new(false, false),
553 )]);
554 } else if !increasing && (!asc || no_partitioning) {
555 window_eq_properties.add_ordering([PhysicalSortExpr::new(
556 Arc::new(window_col),
557 SortOptions::new(true, false),
558 )]);
559 };
560 }
561 }
562 }
563 }
564 }
565 Ok(window_eq_properties)
566}
567
568pub fn get_best_fitting_window(
579 window_exprs: &[Arc<dyn WindowExpr>],
580 input: &Arc<dyn ExecutionPlan>,
581 physical_partition_keys: &[Arc<dyn PhysicalExpr>],
585) -> Result<Option<Arc<dyn ExecutionPlan>>> {
586 let partitionby_exprs = window_exprs[0].partition_by();
589 let orderby_keys = window_exprs[0].order_by();
590 let (should_reverse, input_order_mode) =
591 if let Some((should_reverse, input_order_mode)) =
592 get_window_mode(partitionby_exprs, orderby_keys, input)?
593 {
594 (should_reverse, input_order_mode)
595 } else {
596 return Ok(None);
597 };
598 let is_unbounded = input.boundedness().is_unbounded();
599 if !is_unbounded && input_order_mode != InputOrderMode::Sorted {
600 return Ok(None);
603 };
604
605 let window_expr = if should_reverse {
606 if let Some(reversed_window_expr) = window_exprs
607 .iter()
608 .map(|e| e.get_reverse_expr())
609 .collect::<Option<Vec<_>>>()
610 {
611 reversed_window_expr
612 } else {
613 return Ok(None);
616 }
617 } else {
618 window_exprs.to_vec()
619 };
620
621 if window_expr.iter().all(|e| e.uses_bounded_memory()) {
624 Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
625 window_expr,
626 Arc::clone(input),
627 input_order_mode,
628 !physical_partition_keys.is_empty(),
629 )?) as _))
630 } else if input_order_mode != InputOrderMode::Sorted {
631 Ok(None)
636 } else {
637 Ok(Some(Arc::new(WindowAggExec::try_new(
638 window_expr,
639 Arc::clone(input),
640 !physical_partition_keys.is_empty(),
641 )?) as _))
642 }
643}
644
645pub fn get_window_mode(
657 partitionby_exprs: &[Arc<dyn PhysicalExpr>],
658 orderby_keys: &[PhysicalSortExpr],
659 input: &Arc<dyn ExecutionPlan>,
660) -> Result<Option<(bool, InputOrderMode)>> {
661 let mut input_eqs = input.equivalence_properties().clone();
662 let (_, indices) = input_eqs.find_longest_permutation(partitionby_exprs)?;
663 let partition_by_reqs = indices
664 .iter()
665 .map(|&idx| PhysicalSortRequirement {
666 expr: Arc::clone(&partitionby_exprs[idx]),
667 options: None,
668 })
669 .collect::<Vec<_>>();
670 let const_exprs = partitionby_exprs.iter().cloned().map(ConstExpr::from);
672 input_eqs.add_constants(const_exprs)?;
673 let reverse_orderby_keys =
674 orderby_keys.iter().map(|e| e.reverse()).collect::<Vec<_>>();
675 for (should_swap, orderbys) in
676 [(false, orderby_keys), (true, reverse_orderby_keys.as_ref())]
677 {
678 let mut req = partition_by_reqs.clone();
679 req.extend(orderbys.iter().cloned().map(Into::into));
680 if req.is_empty() || input_eqs.ordering_satisfy_requirement(req)? {
681 let mode = if indices.len() == partitionby_exprs.len() {
683 InputOrderMode::Sorted
684 } else if indices.is_empty() {
685 InputOrderMode::Linear
686 } else {
687 InputOrderMode::PartiallySorted(indices)
688 };
689 return Ok(Some((should_swap, mode)));
690 }
691 }
692 Ok(None)
693}
694
695fn sort_options_resolving_constant(
716 expr: Arc<dyn PhysicalExpr>,
717 only_monotonic: bool,
718) -> Vec<PhysicalSortExpr> {
719 if only_monotonic {
720 vec![
722 PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), PhysicalSortExpr::new(expr, SortOptions::new(true, true)), ]
725 } else {
726 vec![
728 PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)), PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, true)), PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(true, false)), PhysicalSortExpr::new(expr, SortOptions::new(true, true)), ]
733 }
734}
735
736#[cfg(test)]
737mod tests {
738 use super::*;
739 use crate::collect;
740 use crate::expressions::col;
741 use crate::streaming::StreamingTableExec;
742 use crate::test::assert_is_pending;
743 use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
744
745 use arrow::compute::SortOptions;
746 use arrow_schema::{DataType, Field};
747 use datafusion_execution::TaskContext;
748 use datafusion_functions_aggregate::count::count_udaf;
749 use InputOrderMode::{Linear, PartiallySorted, Sorted};
750
751 use futures::FutureExt;
752
753 fn create_test_schema() -> Result<SchemaRef> {
754 let nullable_column = Field::new("nullable_col", DataType::Int32, true);
755 let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
756 let schema = Arc::new(Schema::new(vec![nullable_column, non_nullable_column]));
757
758 Ok(schema)
759 }
760
761 fn create_test_schema2() -> Result<SchemaRef> {
762 let a = Field::new("a", DataType::Int32, true);
763 let b = Field::new("b", DataType::Int32, true);
764 let c = Field::new("c", DataType::Int32, true);
765 let d = Field::new("d", DataType::Int32, true);
766 let e = Field::new("e", DataType::Int32, true);
767 let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
768 Ok(schema)
769 }
770
771 fn create_test_schema3() -> Result<SchemaRef> {
773 let a = Field::new("a", DataType::Int32, true);
774 let b = Field::new("b", DataType::Int32, false);
775 let c = Field::new("c", DataType::Int32, true);
776 let d = Field::new("d", DataType::Int32, false);
777 let e = Field::new("e", DataType::Int32, false);
778 let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
779 Ok(schema)
780 }
781
782 pub fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
784 sort_expr_options(name, schema, SortOptions::default())
785 }
786
787 pub fn sort_expr_options(
789 name: &str,
790 schema: &Schema,
791 options: SortOptions,
792 ) -> PhysicalSortExpr {
793 PhysicalSortExpr {
794 expr: col(name, schema).unwrap(),
795 options,
796 }
797 }
798
799 pub fn streaming_table_exec(
801 schema: &SchemaRef,
802 ordering: LexOrdering,
803 infinite_source: bool,
804 ) -> Result<Arc<dyn ExecutionPlan>> {
805 Ok(Arc::new(StreamingTableExec::try_new(
806 Arc::clone(schema),
807 vec![],
808 None,
809 Some(ordering),
810 infinite_source,
811 None,
812 )?))
813 }
814
815 #[tokio::test]
816 async fn test_calc_requirements() -> Result<()> {
817 let schema = create_test_schema2()?;
818 let test_data = vec![
819 (
821 vec!["a"],
822 vec![("b", true, true)],
823 vec![
824 vec![("a", None), ("b", Some((true, true)))],
825 vec![("b", Some((true, true)))],
826 ],
827 ),
828 (
830 vec!["a"],
831 vec![("a", true, true)],
832 vec![vec![("a", None)], vec![("a", Some((true, true)))]],
833 ),
834 (
836 vec!["a"],
837 vec![("b", true, true), ("c", false, false)],
838 vec![
839 vec![
840 ("a", None),
841 ("b", Some((true, true))),
842 ("c", Some((false, false))),
843 ],
844 vec![("b", Some((true, true))), ("c", Some((false, false)))],
845 ],
846 ),
847 (
849 vec!["a", "c"],
850 vec![("b", true, true), ("c", false, false)],
851 vec![
852 vec![("a", None), ("c", None), ("b", Some((true, true)))],
853 vec![("b", Some((true, true))), ("c", Some((false, false)))],
854 ],
855 ),
856 ];
857 for (pb_params, ob_params, expected_params) in test_data {
858 let mut partitionbys = vec![];
859 for col_name in pb_params {
860 partitionbys.push(col(col_name, &schema)?);
861 }
862
863 let mut orderbys = vec![];
864 for (col_name, descending, nulls_first) in ob_params {
865 let expr = col(col_name, &schema)?;
866 let options = SortOptions::new(descending, nulls_first);
867 orderbys.push(PhysicalSortExpr::new(expr, options));
868 }
869
870 let mut expected: Option<OrderingRequirements> = None;
871 for expected_param in expected_params.clone() {
872 let mut requirements = vec![];
873 for (col_name, reqs) in expected_param {
874 let options = reqs.map(|(descending, nulls_first)| {
875 SortOptions::new(descending, nulls_first)
876 });
877 let expr = col(col_name, &schema)?;
878 requirements.push(PhysicalSortRequirement::new(expr, options));
879 }
880 if let Some(requirements) = LexRequirement::new(requirements) {
881 if let Some(alts) = expected.as_mut() {
882 alts.add_alternative(requirements);
883 } else {
884 expected = Some(OrderingRequirements::new(requirements));
885 }
886 }
887 }
888 assert_eq!(calc_requirements(partitionbys, orderbys), expected);
889 }
890 Ok(())
891 }
892
893 #[tokio::test]
894 async fn test_drop_cancel() -> Result<()> {
895 let task_ctx = Arc::new(TaskContext::default());
896 let schema =
897 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
898
899 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
900 let refs = blocking_exec.refs();
901 let window_agg_exec = Arc::new(WindowAggExec::try_new(
902 vec![create_window_expr(
903 &WindowFunctionDefinition::AggregateUDF(count_udaf()),
904 "count".to_owned(),
905 &[col("a", &schema)?],
906 &[],
907 &[],
908 Arc::new(WindowFrame::new(None)),
909 schema,
910 false,
911 false,
912 None,
913 )?],
914 blocking_exec,
915 false,
916 )?);
917
918 let fut = collect(window_agg_exec, task_ctx);
919 let mut fut = fut.boxed();
920
921 assert_is_pending(&mut fut);
922 drop(fut);
923 assert_strong_count_converges_to_zero(refs).await;
924
925 Ok(())
926 }
927
928 #[tokio::test]
929 async fn test_satisfy_nullable() -> Result<()> {
930 let schema = create_test_schema()?;
931 let params = vec![
932 ((true, true), (false, false), false),
933 ((true, true), (false, true), false),
934 ((true, true), (true, false), false),
935 ((true, false), (false, true), false),
936 ((true, false), (false, false), false),
937 ((true, false), (true, true), false),
938 ((true, false), (true, false), true),
939 ];
940 for (
941 (physical_desc, physical_nulls_first),
942 (req_desc, req_nulls_first),
943 expected,
944 ) in params
945 {
946 let physical_ordering = PhysicalSortExpr {
947 expr: col("nullable_col", &schema)?,
948 options: SortOptions {
949 descending: physical_desc,
950 nulls_first: physical_nulls_first,
951 },
952 };
953 let required_ordering = PhysicalSortExpr {
954 expr: col("nullable_col", &schema)?,
955 options: SortOptions {
956 descending: req_desc,
957 nulls_first: req_nulls_first,
958 },
959 };
960 let res = physical_ordering.satisfy(&required_ordering.into(), &schema);
961 assert_eq!(res, expected);
962 }
963
964 Ok(())
965 }
966
967 #[tokio::test]
968 async fn test_satisfy_non_nullable() -> Result<()> {
969 let schema = create_test_schema()?;
970
971 let params = vec![
972 ((true, true), (false, false), false),
973 ((true, true), (false, true), false),
974 ((true, true), (true, false), true),
975 ((true, false), (false, true), false),
976 ((true, false), (false, false), false),
977 ((true, false), (true, true), true),
978 ((true, false), (true, false), true),
979 ];
980 for (
981 (physical_desc, physical_nulls_first),
982 (req_desc, req_nulls_first),
983 expected,
984 ) in params
985 {
986 let physical_ordering = PhysicalSortExpr {
987 expr: col("non_nullable_col", &schema)?,
988 options: SortOptions {
989 descending: physical_desc,
990 nulls_first: physical_nulls_first,
991 },
992 };
993 let required_ordering = PhysicalSortExpr {
994 expr: col("non_nullable_col", &schema)?,
995 options: SortOptions {
996 descending: req_desc,
997 nulls_first: req_nulls_first,
998 },
999 };
1000 let res = physical_ordering.satisfy(&required_ordering.into(), &schema);
1001 assert_eq!(res, expected);
1002 }
1003
1004 Ok(())
1005 }
1006
1007 #[tokio::test]
1008 async fn test_get_window_mode_exhaustive() -> Result<()> {
1009 let test_schema = create_test_schema3()?;
1010 let ordering = [
1014 sort_expr("a", &test_schema),
1015 sort_expr("b", &test_schema),
1016 sort_expr("c", &test_schema),
1017 sort_expr("d", &test_schema),
1018 ]
1019 .into();
1020 let exec_unbounded = streaming_table_exec(&test_schema, ordering, true)?;
1021
1022 let test_cases = vec![
1034 (vec!["a"], vec!["a"], Some(Sorted)),
1035 (vec!["a"], vec!["b"], Some(Sorted)),
1036 (vec!["a"], vec!["c"], None),
1037 (vec!["a"], vec!["a", "b"], Some(Sorted)),
1038 (vec!["a"], vec!["b", "c"], Some(Sorted)),
1039 (vec!["a"], vec!["a", "c"], None),
1040 (vec!["a"], vec!["a", "b", "c"], Some(Sorted)),
1041 (vec!["b"], vec!["a"], Some(Linear)),
1042 (vec!["b"], vec!["b"], Some(Linear)),
1043 (vec!["b"], vec!["c"], None),
1044 (vec!["b"], vec!["a", "b"], Some(Linear)),
1045 (vec!["b"], vec!["b", "c"], None),
1046 (vec!["b"], vec!["a", "c"], Some(Linear)),
1047 (vec!["b"], vec!["a", "b", "c"], Some(Linear)),
1048 (vec!["c"], vec!["a"], Some(Linear)),
1049 (vec!["c"], vec!["b"], None),
1050 (vec!["c"], vec!["c"], Some(Linear)),
1051 (vec!["c"], vec!["a", "b"], Some(Linear)),
1052 (vec!["c"], vec!["b", "c"], None),
1053 (vec!["c"], vec!["a", "c"], Some(Linear)),
1054 (vec!["c"], vec!["a", "b", "c"], Some(Linear)),
1055 (vec!["b", "a"], vec!["a"], Some(Sorted)),
1056 (vec!["b", "a"], vec!["b"], Some(Sorted)),
1057 (vec!["b", "a"], vec!["c"], Some(Sorted)),
1058 (vec!["b", "a"], vec!["a", "b"], Some(Sorted)),
1059 (vec!["b", "a"], vec!["b", "c"], Some(Sorted)),
1060 (vec!["b", "a"], vec!["a", "c"], Some(Sorted)),
1061 (vec!["b", "a"], vec!["a", "b", "c"], Some(Sorted)),
1062 (vec!["c", "b"], vec!["a"], Some(Linear)),
1063 (vec!["c", "b"], vec!["b"], Some(Linear)),
1064 (vec!["c", "b"], vec!["c"], Some(Linear)),
1065 (vec!["c", "b"], vec!["a", "b"], Some(Linear)),
1066 (vec!["c", "b"], vec!["b", "c"], Some(Linear)),
1067 (vec!["c", "b"], vec!["a", "c"], Some(Linear)),
1068 (vec!["c", "b"], vec!["a", "b", "c"], Some(Linear)),
1069 (vec!["c", "a"], vec!["a"], Some(PartiallySorted(vec![1]))),
1070 (vec!["c", "a"], vec!["b"], Some(PartiallySorted(vec![1]))),
1071 (vec!["c", "a"], vec!["c"], Some(PartiallySorted(vec![1]))),
1072 (
1073 vec!["c", "a"],
1074 vec!["a", "b"],
1075 Some(PartiallySorted(vec![1])),
1076 ),
1077 (
1078 vec!["c", "a"],
1079 vec!["b", "c"],
1080 Some(PartiallySorted(vec![1])),
1081 ),
1082 (
1083 vec!["c", "a"],
1084 vec!["a", "c"],
1085 Some(PartiallySorted(vec![1])),
1086 ),
1087 (
1088 vec!["c", "a"],
1089 vec!["a", "b", "c"],
1090 Some(PartiallySorted(vec![1])),
1091 ),
1092 (vec!["c", "b", "a"], vec!["a"], Some(Sorted)),
1093 (vec!["c", "b", "a"], vec!["b"], Some(Sorted)),
1094 (vec!["c", "b", "a"], vec!["c"], Some(Sorted)),
1095 (vec!["c", "b", "a"], vec!["a", "b"], Some(Sorted)),
1096 (vec!["c", "b", "a"], vec!["b", "c"], Some(Sorted)),
1097 (vec!["c", "b", "a"], vec!["a", "c"], Some(Sorted)),
1098 (vec!["c", "b", "a"], vec!["a", "b", "c"], Some(Sorted)),
1099 ];
1100 for (case_idx, test_case) in test_cases.iter().enumerate() {
1101 let (partition_by_columns, order_by_params, expected) = &test_case;
1102 let mut partition_by_exprs = vec![];
1103 for col_name in partition_by_columns {
1104 partition_by_exprs.push(col(col_name, &test_schema)?);
1105 }
1106
1107 let mut order_by_exprs = vec![];
1108 for col_name in order_by_params {
1109 let expr = col(col_name, &test_schema)?;
1110 let options = SortOptions::default();
1113 order_by_exprs.push(PhysicalSortExpr { expr, options });
1114 }
1115 let res =
1116 get_window_mode(&partition_by_exprs, &order_by_exprs, &exec_unbounded)?;
1117 let res = res.map(|(_, mode)| mode);
1119 assert_eq!(
1120 res, *expected,
1121 "Unexpected result for in unbounded test case#: {case_idx:?}, case: {test_case:?}"
1122 );
1123 }
1124
1125 Ok(())
1126 }
1127
1128 #[tokio::test]
1129 async fn test_get_window_mode() -> Result<()> {
1130 let test_schema = create_test_schema3()?;
1131 let ordering = [
1135 sort_expr("a", &test_schema),
1136 sort_expr("b", &test_schema),
1137 sort_expr("c", &test_schema),
1138 sort_expr("d", &test_schema),
1139 ]
1140 .into();
1141 let exec_unbounded = streaming_table_exec(&test_schema, ordering, true)?;
1142
1143 let test_cases = vec![
1157 (vec!["a", "b"], vec![("c", false, false)], None),
1159 (vec![], vec![("c", false, true)], None),
1161 (vec!["b"], vec![("c", false, true)], None),
1163 (vec!["a"], vec![("c", false, true)], None),
1165 (
1167 vec!["a", "b"],
1168 vec![("c", false, true), ("e", false, true)],
1169 None,
1170 ),
1171 (vec!["a"], vec![("b", false, true)], Some((false, Sorted))),
1173 (vec!["a"], vec![("a", false, true)], Some((false, Sorted))),
1175 (vec!["a"], vec![("a", false, false)], Some((false, Sorted))),
1177 (vec!["a"], vec![("a", true, true)], Some((false, Sorted))),
1179 (vec!["a"], vec![("a", true, false)], Some((false, Sorted))),
1181 (vec!["a"], vec![("b", false, false)], Some((false, Sorted))),
1183 (vec!["a"], vec![("b", true, false)], Some((true, Sorted))),
1185 (
1187 vec!["a", "b"],
1188 vec![("c", false, true)],
1189 Some((false, Sorted)),
1190 ),
1191 (
1193 vec!["b", "a"],
1194 vec![("c", false, true)],
1195 Some((false, Sorted)),
1196 ),
1197 (
1199 vec!["a", "b"],
1200 vec![("c", true, false)],
1201 Some((true, Sorted)),
1202 ),
1203 (
1205 vec!["e"],
1206 vec![("a", false, true)],
1207 Some((false, Linear)),
1209 ),
1210 (
1212 vec!["b", "c"],
1213 vec![("a", false, true), ("c", false, true)],
1214 Some((false, Linear)),
1215 ),
1216 (vec!["b"], vec![("a", false, true)], Some((false, Linear))),
1218 (
1220 vec!["a", "e"],
1221 vec![("b", false, true)],
1222 Some((false, PartiallySorted(vec![0]))),
1223 ),
1224 (
1226 vec!["a", "c"],
1227 vec![("b", false, true)],
1228 Some((false, PartiallySorted(vec![0]))),
1229 ),
1230 (
1232 vec!["c", "a"],
1233 vec![("b", false, true)],
1234 Some((false, PartiallySorted(vec![1]))),
1235 ),
1236 (
1238 vec!["d", "b", "a"],
1239 vec![("c", false, true)],
1240 Some((false, PartiallySorted(vec![2, 1]))),
1241 ),
1242 (
1244 vec!["e", "b", "a"],
1245 vec![("c", false, true)],
1246 Some((false, PartiallySorted(vec![2, 1]))),
1247 ),
1248 (
1250 vec!["d", "a"],
1251 vec![("b", false, true)],
1252 Some((false, PartiallySorted(vec![1]))),
1253 ),
1254 (
1256 vec!["a"],
1257 vec![("b", false, true), ("a", false, true)],
1258 Some((false, Sorted)),
1259 ),
1260 (vec![], vec![("b", false, true), ("a", false, true)], None),
1262 ];
1263 for (case_idx, test_case) in test_cases.iter().enumerate() {
1264 let (partition_by_columns, order_by_params, expected) = &test_case;
1265 let mut partition_by_exprs = vec![];
1266 for col_name in partition_by_columns {
1267 partition_by_exprs.push(col(col_name, &test_schema)?);
1268 }
1269
1270 let mut order_by_exprs = vec![];
1271 for (col_name, descending, nulls_first) in order_by_params {
1272 let expr = col(col_name, &test_schema)?;
1273 let options = SortOptions {
1274 descending: *descending,
1275 nulls_first: *nulls_first,
1276 };
1277 order_by_exprs.push(PhysicalSortExpr { expr, options });
1278 }
1279
1280 assert_eq!(
1281 get_window_mode(&partition_by_exprs, &order_by_exprs, &exec_unbounded)?,
1282 *expected,
1283 "Unexpected result for in unbounded test case#: {case_idx:?}, case: {test_case:?}"
1284 );
1285 }
1286
1287 Ok(())
1288 }
1289}