datafusion_physical_plan/spill/
spill_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Define the `SpillManager` struct, which is responsible for reading and writing `RecordBatch`es to raw files based on the provided configurations.
19
20use arrow::array::StringViewArray;
21use arrow::datatypes::SchemaRef;
22use arrow::record_batch::RecordBatch;
23use datafusion_execution::runtime_env::RuntimeEnv;
24use std::sync::Arc;
25
26use datafusion_common::{config::SpillCompression, Result};
27use datafusion_execution::disk_manager::RefCountedTempFile;
28use datafusion_execution::SendableRecordBatchStream;
29
30use super::{in_progress_spill_file::InProgressSpillFile, SpillReaderStream};
31use crate::coop::cooperative;
32use crate::{common::spawn_buffered, metrics::SpillMetrics};
33
34/// The `SpillManager` is responsible for the following tasks:
35/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations.
36/// - Updating the associated metrics.
37///
38/// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files.
39/// For example, all records within the same spill file are ordered according to a specific order.
40#[derive(Debug, Clone)]
41pub struct SpillManager {
42    env: Arc<RuntimeEnv>,
43    pub(crate) metrics: SpillMetrics,
44    schema: SchemaRef,
45    /// Number of batches to buffer in memory during disk reads
46    batch_read_buffer_capacity: usize,
47    /// general-purpose compression options
48    pub(crate) compression: SpillCompression,
49}
50
51impl SpillManager {
52    pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) -> Self {
53        Self {
54            env,
55            metrics,
56            schema,
57            batch_read_buffer_capacity: 2,
58            compression: SpillCompression::default(),
59        }
60    }
61
62    pub fn with_batch_read_buffer_capacity(
63        mut self,
64        batch_read_buffer_capacity: usize,
65    ) -> Self {
66        self.batch_read_buffer_capacity = batch_read_buffer_capacity;
67        self
68    }
69
70    pub fn with_compression_type(mut self, spill_compression: SpillCompression) -> Self {
71        self.compression = spill_compression;
72        self
73    }
74
75    /// Creates a temporary file for in-progress operations, returning an error
76    /// message if file creation fails. The file can be used to append batches
77    /// incrementally and then finish the file when done.
78    pub fn create_in_progress_file(
79        &self,
80        request_msg: &str,
81    ) -> Result<InProgressSpillFile> {
82        let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?;
83        Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file))
84    }
85
86    /// Spill input `batches` into a single file in a atomic operation. If it is
87    /// intended to incrementally write in-memory batches into the same spill file,
88    /// use [`Self::create_in_progress_file`] instead.
89    /// None is returned if no batches are spilled.
90    ///
91    /// # Errors
92    /// - Returns an error if spilling would exceed the disk usage limit configured
93    ///   by `max_temp_directory_size` in `DiskManager`
94    pub fn spill_record_batch_and_finish(
95        &self,
96        batches: &[RecordBatch],
97        request_msg: &str,
98    ) -> Result<Option<RefCountedTempFile>> {
99        let mut in_progress_file = self.create_in_progress_file(request_msg)?;
100
101        for batch in batches {
102            in_progress_file.append_batch(batch)?;
103        }
104
105        in_progress_file.finish()
106    }
107
108    /// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method
109    /// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`.
110    ///
111    /// # Errors
112    /// - Returns an error if spilling would exceed the disk usage limit configured
113    ///   by `max_temp_directory_size` in `DiskManager`
114    pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
115        &self,
116        batch: &RecordBatch,
117        request_description: &str,
118        row_limit: usize,
119    ) -> Result<Option<(RefCountedTempFile, usize)>> {
120        let total_rows = batch.num_rows();
121        let mut batches = Vec::new();
122        let mut offset = 0;
123
124        // It's ok to calculate all slices first, because slicing is zero-copy.
125        while offset < total_rows {
126            let length = std::cmp::min(total_rows - offset, row_limit);
127            let sliced_batch = batch.slice(offset, length);
128            batches.push(sliced_batch);
129            offset += length;
130        }
131
132        let mut in_progress_file = self.create_in_progress_file(request_description)?;
133
134        let mut max_record_batch_size = 0;
135
136        for batch in batches {
137            in_progress_file.append_batch(&batch)?;
138
139            max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
140        }
141
142        let file = in_progress_file.finish()?;
143
144        Ok(file.map(|f| (f, max_record_batch_size)))
145    }
146
147    /// Spill a stream of `RecordBatch`es to disk and return the spill file and the size of the largest batch in memory
148    pub(crate) async fn spill_record_batch_stream_and_return_max_batch_memory(
149        &self,
150        stream: &mut SendableRecordBatchStream,
151        request_description: &str,
152    ) -> Result<Option<(RefCountedTempFile, usize)>> {
153        use futures::StreamExt;
154
155        let mut in_progress_file = self.create_in_progress_file(request_description)?;
156
157        let mut max_record_batch_size = 0;
158
159        while let Some(batch) = stream.next().await {
160            let batch = batch?;
161            in_progress_file.append_batch(&batch)?;
162
163            max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
164        }
165
166        let file = in_progress_file.finish()?;
167
168        Ok(file.map(|f| (f, max_record_batch_size)))
169    }
170
171    /// Reads a spill file as a stream. The file must be created by the current `SpillManager`.
172    /// This method will generate output in FIFO order: the batch appended first
173    /// will be read first.
174    pub fn read_spill_as_stream(
175        &self,
176        spill_file_path: RefCountedTempFile,
177        max_record_batch_memory: Option<usize>,
178    ) -> Result<SendableRecordBatchStream> {
179        let stream = Box::pin(cooperative(SpillReaderStream::new(
180            Arc::clone(&self.schema),
181            spill_file_path,
182            max_record_batch_memory,
183        )));
184
185        Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
186    }
187}
188
189pub(crate) trait GetSlicedSize {
190    /// Returns the size of the `RecordBatch` when sliced.
191    /// Note: if multiple arrays or even a single array share the same data buffers, we may double count each buffer.
192    /// Therefore, make sure we call gc() or organize_stringview_arrays() before using this method.
193    fn get_sliced_size(&self) -> Result<usize>;
194}
195
196impl GetSlicedSize for RecordBatch {
197    fn get_sliced_size(&self) -> Result<usize> {
198        let mut total = 0;
199        for array in self.columns() {
200            let data = array.to_data();
201            total += data.get_slice_memory_size()?;
202
203            // While StringViewArray holds large data buffer for non inlined string, the Arrow layout (BufferSpec)
204            // does not include any data buffers. Currently, ArrayData::get_slice_memory_size()
205            // under-counts memory size by accounting only views buffer although data buffer is cloned during slice()
206            //
207            // Therefore, we manually add the sum of the lengths used by all non inlined views
208            // on top of the sliced size for views buffer. This matches the intended semantics of
209            // "bytes needed if we materialized exactly this slice into fresh buffers".
210            // This is a workaround until https://github.com/apache/arrow-rs/issues/8230
211            if let Some(sv) = array.as_any().downcast_ref::<StringViewArray>() {
212                for buffer in sv.data_buffers() {
213                    total += buffer.capacity();
214                }
215            }
216        }
217        Ok(total)
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use crate::spill::{get_record_batch_memory_size, spill_manager::GetSlicedSize};
224    use arrow::datatypes::{DataType, Field, Schema};
225    use arrow::{
226        array::{ArrayRef, StringViewArray},
227        record_batch::RecordBatch,
228    };
229    use datafusion_common::Result;
230    use std::sync::Arc;
231
232    #[test]
233    fn check_sliced_size_for_string_view_array() -> Result<()> {
234        let array_length = 50;
235        let short_len = 8;
236        let long_len = 25;
237
238        // Build StringViewArray that includes both inline strings and non inlined strings
239        let strings: Vec<String> = (0..array_length)
240            .map(|i| {
241                if i % 2 == 0 {
242                    "a".repeat(short_len)
243                } else {
244                    "b".repeat(long_len)
245                }
246            })
247            .collect();
248
249        let string_array = StringViewArray::from(strings);
250        let array_ref: ArrayRef = Arc::new(string_array);
251        let batch = RecordBatch::try_new(
252            Arc::new(Schema::new(vec![Field::new(
253                "strings",
254                DataType::Utf8View,
255                false,
256            )])),
257            vec![array_ref],
258        )
259        .unwrap();
260
261        // We did not slice the batch, so these two memory size should be equal
262        assert_eq!(
263            batch.get_sliced_size().unwrap(),
264            get_record_batch_memory_size(&batch)
265        );
266
267        // Slice the batch into half
268        let half_batch = batch.slice(0, array_length / 2);
269        // Now sliced_size is smaller because the views buffer is sliced
270        assert!(
271            half_batch.get_sliced_size().unwrap()
272                < get_record_batch_memory_size(&half_batch)
273        );
274        let data = arrow::array::Array::to_data(&half_batch.column(0));
275        let views_sliced_size = data.get_slice_memory_size()?;
276        // The sliced size should be larger than sliced views buffer size
277        assert!(views_sliced_size < half_batch.get_sliced_size().unwrap());
278
279        Ok(())
280    }
281}