datafusion_physical_plan/sorts/
streaming_merge.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Merge that deals with an arbitrary size of streaming inputs.
19//! This is an order-preserving merge.
20
21use crate::metrics::BaselineMetrics;
22use crate::sorts::multi_level_merge::MultiLevelMergeBuilder;
23use crate::sorts::{
24    merge::SortPreservingMergeStream,
25    stream::{FieldCursorStream, RowCursorStream},
26};
27use crate::{SendableRecordBatchStream, SpillManager};
28use arrow::array::*;
29use arrow::datatypes::{DataType, SchemaRef};
30use datafusion_common::{internal_err, Result};
31use datafusion_execution::disk_manager::RefCountedTempFile;
32use datafusion_execution::memory_pool::{
33    human_readable_size, MemoryConsumer, MemoryPool, MemoryReservation,
34    UnboundedMemoryPool,
35};
36use datafusion_physical_expr_common::sort_expr::LexOrdering;
37use std::sync::Arc;
38
39macro_rules! primitive_merge_helper {
40    ($t:ty, $($v:ident),+) => {
41        merge_helper!(PrimitiveArray<$t>, $($v),+)
42    };
43}
44
45macro_rules! merge_helper {
46    ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident, $enable_round_robin_tie_breaker:ident) => {{
47        let streams =
48            FieldCursorStream::<$t>::new($sort, $streams, $reservation.new_empty());
49        return Ok(Box::pin(SortPreservingMergeStream::new(
50            Box::new(streams),
51            $schema,
52            $tracking_metrics,
53            $batch_size,
54            $fetch,
55            $reservation,
56            $enable_round_robin_tie_breaker,
57        )));
58    }};
59}
60
61pub struct SortedSpillFile {
62    pub file: RefCountedTempFile,
63
64    /// how much memory the largest memory batch is taking
65    pub max_record_batch_memory: usize,
66}
67
68impl std::fmt::Debug for SortedSpillFile {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        write!(
71            f,
72            "SortedSpillFile({:?}) takes {}",
73            self.file.path(),
74            human_readable_size(self.max_record_batch_memory)
75        )
76    }
77}
78
79#[derive(Default)]
80pub struct StreamingMergeBuilder<'a> {
81    streams: Vec<SendableRecordBatchStream>,
82    sorted_spill_files: Vec<SortedSpillFile>,
83    spill_manager: Option<SpillManager>,
84    schema: Option<SchemaRef>,
85    expressions: Option<&'a LexOrdering>,
86    metrics: Option<BaselineMetrics>,
87    batch_size: Option<usize>,
88    fetch: Option<usize>,
89    reservation: Option<MemoryReservation>,
90    enable_round_robin_tie_breaker: bool,
91}
92
93impl<'a> StreamingMergeBuilder<'a> {
94    pub fn new() -> Self {
95        Self {
96            enable_round_robin_tie_breaker: true,
97            ..Default::default()
98        }
99    }
100
101    pub fn with_streams(mut self, streams: Vec<SendableRecordBatchStream>) -> Self {
102        self.streams = streams;
103        self
104    }
105
106    pub fn with_sorted_spill_files(
107        mut self,
108        sorted_spill_files: Vec<SortedSpillFile>,
109    ) -> Self {
110        self.sorted_spill_files = sorted_spill_files;
111        self
112    }
113
114    pub fn with_spill_manager(mut self, spill_manager: SpillManager) -> Self {
115        self.spill_manager = Some(spill_manager);
116        self
117    }
118
119    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
120        self.schema = Some(schema);
121        self
122    }
123
124    pub fn with_expressions(mut self, expressions: &'a LexOrdering) -> Self {
125        self.expressions = Some(expressions);
126        self
127    }
128
129    pub fn with_metrics(mut self, metrics: BaselineMetrics) -> Self {
130        self.metrics = Some(metrics);
131        self
132    }
133
134    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
135        self.batch_size = Some(batch_size);
136        self
137    }
138
139    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
140        self.fetch = fetch;
141        self
142    }
143
144    pub fn with_reservation(mut self, reservation: MemoryReservation) -> Self {
145        self.reservation = Some(reservation);
146        self
147    }
148
149    /// See [SortPreservingMergeExec::with_round_robin_repartition] for more
150    /// information.
151    ///
152    /// [SortPreservingMergeExec::with_round_robin_repartition]: crate::sorts::sort_preserving_merge::SortPreservingMergeExec::with_round_robin_repartition
153    pub fn with_round_robin_tie_breaker(
154        mut self,
155        enable_round_robin_tie_breaker: bool,
156    ) -> Self {
157        self.enable_round_robin_tie_breaker = enable_round_robin_tie_breaker;
158        self
159    }
160
161    /// Bypass the mempool and avoid using the memory reservation.
162    ///
163    /// This is not marked as `pub` because it is not recommended to use this method
164    pub(super) fn with_bypass_mempool(self) -> Self {
165        let mem_pool: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default());
166
167        self.with_reservation(
168            MemoryConsumer::new("merge stream mock memory").register(&mem_pool),
169        )
170    }
171
172    pub fn build(self) -> Result<SendableRecordBatchStream> {
173        let Self {
174            streams,
175            sorted_spill_files,
176            spill_manager,
177            schema,
178            metrics,
179            batch_size,
180            reservation,
181            fetch,
182            expressions,
183            enable_round_robin_tie_breaker,
184        } = self;
185
186        // Early return if expressions are empty:
187        let Some(expressions) = expressions else {
188            return internal_err!("Sort expressions cannot be empty for streaming merge");
189        };
190
191        if !sorted_spill_files.is_empty() {
192            // Unwrapping mandatory fields
193            let schema = schema.expect("Schema cannot be empty for streaming merge");
194            let metrics = metrics.expect("Metrics cannot be empty for streaming merge");
195            let batch_size =
196                batch_size.expect("Batch size cannot be empty for streaming merge");
197            let reservation =
198                reservation.expect("Reservation cannot be empty for streaming merge");
199
200            return Ok(MultiLevelMergeBuilder::new(
201                spill_manager.expect("spill_manager should exist"),
202                schema,
203                sorted_spill_files,
204                streams,
205                expressions.clone(),
206                metrics,
207                batch_size,
208                reservation,
209                fetch,
210                enable_round_robin_tie_breaker,
211            )
212            .create_spillable_merge_stream());
213        }
214
215        // Early return if streams are empty:
216        if streams.is_empty() {
217            return internal_err!(
218                "Streams/sorted spill files cannot be empty for streaming merge"
219            );
220        }
221
222        // Unwrapping mandatory fields
223        let schema = schema.expect("Schema cannot be empty for streaming merge");
224        let metrics = metrics.expect("Metrics cannot be empty for streaming merge");
225        let batch_size =
226            batch_size.expect("Batch size cannot be empty for streaming merge");
227        let reservation =
228            reservation.expect("Reservation cannot be empty for streaming merge");
229
230        // Special case single column comparisons with optimized cursor implementations
231        if expressions.len() == 1 {
232            let sort = expressions[0].clone();
233            let data_type = sort.expr.data_type(schema.as_ref())?;
234            downcast_primitive! {
235                data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker),
236                DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
237                DataType::Utf8View => merge_helper!(StringViewArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
238                DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
239                DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
240                DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
241                _ => {}
242            }
243        }
244
245        let streams = RowCursorStream::try_new(
246            schema.as_ref(),
247            expressions,
248            streams,
249            reservation.new_empty(),
250        )?;
251        Ok(Box::pin(SortPreservingMergeStream::new(
252            Box::new(streams),
253            schema,
254            metrics,
255            batch_size,
256            fetch,
257            reservation,
258            enable_round_robin_tie_breaker,
259        )))
260    }
261}