datafusion_physical_plan/sorts/builder.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::spill::get_record_batch_memory_size;
19use arrow::compute::interleave;
20use arrow::datatypes::SchemaRef;
21use arrow::record_batch::RecordBatch;
22use datafusion_common::Result;
23use datafusion_execution::memory_pool::MemoryReservation;
24use std::sync::Arc;
25
26#[derive(Debug, Copy, Clone, Default)]
27struct BatchCursor {
28 /// The index into BatchBuilder::batches
29 batch_idx: usize,
30 /// The row index within the given batch
31 row_idx: usize,
32}
33
34/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`RecordBatch`]
35#[derive(Debug)]
36pub struct BatchBuilder {
37 /// The schema of the RecordBatches yielded by this stream
38 schema: SchemaRef,
39
40 /// Maintain a list of [`RecordBatch`] and their corresponding stream
41 batches: Vec<(usize, RecordBatch)>,
42
43 /// Accounts for memory used by buffered batches
44 reservation: MemoryReservation,
45
46 /// The current [`BatchCursor`] for each stream
47 cursors: Vec<BatchCursor>,
48
49 /// The accumulated stream indexes from which to pull rows
50 /// Consists of a tuple of `(batch_idx, row_idx)`
51 indices: Vec<(usize, usize)>,
52}
53
54impl BatchBuilder {
55 /// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size`
56 pub fn new(
57 schema: SchemaRef,
58 stream_count: usize,
59 batch_size: usize,
60 reservation: MemoryReservation,
61 ) -> Self {
62 Self {
63 schema,
64 batches: Vec::with_capacity(stream_count * 2),
65 cursors: vec![BatchCursor::default(); stream_count],
66 indices: Vec::with_capacity(batch_size),
67 reservation,
68 }
69 }
70
71 /// Append a new batch in `stream_idx`
72 pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> {
73 self.reservation
74 .try_grow(get_record_batch_memory_size(&batch))?;
75 let batch_idx = self.batches.len();
76 self.batches.push((stream_idx, batch));
77 self.cursors[stream_idx] = BatchCursor {
78 batch_idx,
79 row_idx: 0,
80 };
81 Ok(())
82 }
83
84 /// Append the next row from `stream_idx`
85 pub fn push_row(&mut self, stream_idx: usize) {
86 let cursor = &mut self.cursors[stream_idx];
87 let row_idx = cursor.row_idx;
88 cursor.row_idx += 1;
89 self.indices.push((cursor.batch_idx, row_idx));
90 }
91
92 /// Returns the number of in-progress rows in this [`BatchBuilder`]
93 pub fn len(&self) -> usize {
94 self.indices.len()
95 }
96
97 /// Returns `true` if this [`BatchBuilder`] contains no in-progress rows
98 pub fn is_empty(&self) -> bool {
99 self.indices.is_empty()
100 }
101
102 /// Returns the schema of this [`BatchBuilder`]
103 pub fn schema(&self) -> &SchemaRef {
104 &self.schema
105 }
106
107 /// Drains the in_progress row indexes, and builds a new RecordBatch from them
108 ///
109 /// Will then drop any batches for which all rows have been yielded to the output
110 ///
111 /// Returns `None` if no pending rows
112 pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
113 if self.is_empty() {
114 return Ok(None);
115 }
116
117 let columns = (0..self.schema.fields.len())
118 .map(|column_idx| {
119 let arrays: Vec<_> = self
120 .batches
121 .iter()
122 .map(|(_, batch)| batch.column(column_idx).as_ref())
123 .collect();
124 Ok(interleave(&arrays, &self.indices)?)
125 })
126 .collect::<Result<Vec<_>>>()?;
127
128 self.indices.clear();
129
130 // New cursors are only created once the previous cursor for the stream
131 // is finished. This means all remaining rows from all but the last batch
132 // for each stream have been yielded to the newly created record batch
133 //
134 // We can therefore drop all but the last batch for each stream
135 let mut batch_idx = 0;
136 let mut retained = 0;
137 self.batches.retain(|(stream_idx, batch)| {
138 let stream_cursor = &mut self.cursors[*stream_idx];
139 let retain = stream_cursor.batch_idx == batch_idx;
140 batch_idx += 1;
141
142 if retain {
143 stream_cursor.batch_idx = retained;
144 retained += 1;
145 } else {
146 self.reservation.shrink(get_record_batch_memory_size(batch));
147 }
148 retain
149 });
150
151 Ok(Some(RecordBatch::try_new(
152 Arc::clone(&self.schema),
153 columns,
154 )?))
155 }
156}