datafusion_physical_plan/sorts/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::spill::get_record_batch_memory_size;
19use arrow::compute::interleave;
20use arrow::datatypes::SchemaRef;
21use arrow::record_batch::RecordBatch;
22use datafusion_common::Result;
23use datafusion_execution::memory_pool::MemoryReservation;
24use std::sync::Arc;
25
26#[derive(Debug, Copy, Clone, Default)]
27struct BatchCursor {
28    /// The index into BatchBuilder::batches
29    batch_idx: usize,
30    /// The row index within the given batch
31    row_idx: usize,
32}
33
34/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`RecordBatch`]
35#[derive(Debug)]
36pub struct BatchBuilder {
37    /// The schema of the RecordBatches yielded by this stream
38    schema: SchemaRef,
39
40    /// Maintain a list of [`RecordBatch`] and their corresponding stream
41    batches: Vec<(usize, RecordBatch)>,
42
43    /// Accounts for memory used by buffered batches
44    reservation: MemoryReservation,
45
46    /// The current [`BatchCursor`] for each stream
47    cursors: Vec<BatchCursor>,
48
49    /// The accumulated stream indexes from which to pull rows
50    /// Consists of a tuple of `(batch_idx, row_idx)`
51    indices: Vec<(usize, usize)>,
52}
53
54impl BatchBuilder {
55    /// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size`
56    pub fn new(
57        schema: SchemaRef,
58        stream_count: usize,
59        batch_size: usize,
60        reservation: MemoryReservation,
61    ) -> Self {
62        Self {
63            schema,
64            batches: Vec::with_capacity(stream_count * 2),
65            cursors: vec![BatchCursor::default(); stream_count],
66            indices: Vec::with_capacity(batch_size),
67            reservation,
68        }
69    }
70
71    /// Append a new batch in `stream_idx`
72    pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> {
73        self.reservation
74            .try_grow(get_record_batch_memory_size(&batch))?;
75        let batch_idx = self.batches.len();
76        self.batches.push((stream_idx, batch));
77        self.cursors[stream_idx] = BatchCursor {
78            batch_idx,
79            row_idx: 0,
80        };
81        Ok(())
82    }
83
84    /// Append the next row from `stream_idx`
85    pub fn push_row(&mut self, stream_idx: usize) {
86        let cursor = &mut self.cursors[stream_idx];
87        let row_idx = cursor.row_idx;
88        cursor.row_idx += 1;
89        self.indices.push((cursor.batch_idx, row_idx));
90    }
91
92    /// Returns the number of in-progress rows in this [`BatchBuilder`]
93    pub fn len(&self) -> usize {
94        self.indices.len()
95    }
96
97    /// Returns `true` if this [`BatchBuilder`] contains no in-progress rows
98    pub fn is_empty(&self) -> bool {
99        self.indices.is_empty()
100    }
101
102    /// Returns the schema of this [`BatchBuilder`]
103    pub fn schema(&self) -> &SchemaRef {
104        &self.schema
105    }
106
107    /// Drains the in_progress row indexes, and builds a new RecordBatch from them
108    ///
109    /// Will then drop any batches for which all rows have been yielded to the output
110    ///
111    /// Returns `None` if no pending rows
112    pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
113        if self.is_empty() {
114            return Ok(None);
115        }
116
117        let columns = (0..self.schema.fields.len())
118            .map(|column_idx| {
119                let arrays: Vec<_> = self
120                    .batches
121                    .iter()
122                    .map(|(_, batch)| batch.column(column_idx).as_ref())
123                    .collect();
124                Ok(interleave(&arrays, &self.indices)?)
125            })
126            .collect::<Result<Vec<_>>>()?;
127
128        self.indices.clear();
129
130        // New cursors are only created once the previous cursor for the stream
131        // is finished. This means all remaining rows from all but the last batch
132        // for each stream have been yielded to the newly created record batch
133        //
134        // We can therefore drop all but the last batch for each stream
135        let mut batch_idx = 0;
136        let mut retained = 0;
137        self.batches.retain(|(stream_idx, batch)| {
138            let stream_cursor = &mut self.cursors[*stream_idx];
139            let retain = stream_cursor.batch_idx == batch_idx;
140            batch_idx += 1;
141
142            if retain {
143                stream_cursor.batch_idx = retained;
144                retained += 1;
145            } else {
146                self.reservation.shrink(get_record_batch_memory_size(batch));
147            }
148            retain
149        });
150
151        Ok(Some(RecordBatch::try_new(
152            Arc::clone(&self.schema),
153            columns,
154        )?))
155    }
156}