datafusion_physical_plan/spill/
spill_manager.rs1use arrow::array::StringViewArray;
21use arrow::datatypes::SchemaRef;
22use arrow::record_batch::RecordBatch;
23use datafusion_execution::runtime_env::RuntimeEnv;
24use std::sync::Arc;
25
26use datafusion_common::{config::SpillCompression, Result};
27use datafusion_execution::disk_manager::RefCountedTempFile;
28use datafusion_execution::SendableRecordBatchStream;
29
30use super::{in_progress_spill_file::InProgressSpillFile, SpillReaderStream};
31use crate::coop::cooperative;
32use crate::{common::spawn_buffered, metrics::SpillMetrics};
33
34#[derive(Debug, Clone)]
41pub struct SpillManager {
42 env: Arc<RuntimeEnv>,
43 pub(crate) metrics: SpillMetrics,
44 schema: SchemaRef,
45 batch_read_buffer_capacity: usize,
47 pub(crate) compression: SpillCompression,
49}
50
51impl SpillManager {
52 pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) -> Self {
53 Self {
54 env,
55 metrics,
56 schema,
57 batch_read_buffer_capacity: 2,
58 compression: SpillCompression::default(),
59 }
60 }
61
62 pub fn with_batch_read_buffer_capacity(
63 mut self,
64 batch_read_buffer_capacity: usize,
65 ) -> Self {
66 self.batch_read_buffer_capacity = batch_read_buffer_capacity;
67 self
68 }
69
70 pub fn with_compression_type(mut self, spill_compression: SpillCompression) -> Self {
71 self.compression = spill_compression;
72 self
73 }
74
75 pub fn create_in_progress_file(
79 &self,
80 request_msg: &str,
81 ) -> Result<InProgressSpillFile> {
82 let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?;
83 Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file))
84 }
85
86 pub fn spill_record_batch_and_finish(
95 &self,
96 batches: &[RecordBatch],
97 request_msg: &str,
98 ) -> Result<Option<RefCountedTempFile>> {
99 let mut in_progress_file = self.create_in_progress_file(request_msg)?;
100
101 for batch in batches {
102 in_progress_file.append_batch(batch)?;
103 }
104
105 in_progress_file.finish()
106 }
107
108 pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
115 &self,
116 batch: &RecordBatch,
117 request_description: &str,
118 row_limit: usize,
119 ) -> Result<Option<(RefCountedTempFile, usize)>> {
120 let total_rows = batch.num_rows();
121 let mut batches = Vec::new();
122 let mut offset = 0;
123
124 while offset < total_rows {
126 let length = std::cmp::min(total_rows - offset, row_limit);
127 let sliced_batch = batch.slice(offset, length);
128 batches.push(sliced_batch);
129 offset += length;
130 }
131
132 let mut in_progress_file = self.create_in_progress_file(request_description)?;
133
134 let mut max_record_batch_size = 0;
135
136 for batch in batches {
137 in_progress_file.append_batch(&batch)?;
138
139 max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
140 }
141
142 let file = in_progress_file.finish()?;
143
144 Ok(file.map(|f| (f, max_record_batch_size)))
145 }
146
147 pub(crate) async fn spill_record_batch_stream_and_return_max_batch_memory(
149 &self,
150 stream: &mut SendableRecordBatchStream,
151 request_description: &str,
152 ) -> Result<Option<(RefCountedTempFile, usize)>> {
153 use futures::StreamExt;
154
155 let mut in_progress_file = self.create_in_progress_file(request_description)?;
156
157 let mut max_record_batch_size = 0;
158
159 while let Some(batch) = stream.next().await {
160 let batch = batch?;
161 in_progress_file.append_batch(&batch)?;
162
163 max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
164 }
165
166 let file = in_progress_file.finish()?;
167
168 Ok(file.map(|f| (f, max_record_batch_size)))
169 }
170
171 pub fn read_spill_as_stream(
175 &self,
176 spill_file_path: RefCountedTempFile,
177 max_record_batch_memory: Option<usize>,
178 ) -> Result<SendableRecordBatchStream> {
179 let stream = Box::pin(cooperative(SpillReaderStream::new(
180 Arc::clone(&self.schema),
181 spill_file_path,
182 max_record_batch_memory,
183 )));
184
185 Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
186 }
187}
188
189pub(crate) trait GetSlicedSize {
190 fn get_sliced_size(&self) -> Result<usize>;
194}
195
196impl GetSlicedSize for RecordBatch {
197 fn get_sliced_size(&self) -> Result<usize> {
198 let mut total = 0;
199 for array in self.columns() {
200 let data = array.to_data();
201 total += data.get_slice_memory_size()?;
202
203 if let Some(sv) = array.as_any().downcast_ref::<StringViewArray>() {
212 for buffer in sv.data_buffers() {
213 total += buffer.capacity();
214 }
215 }
216 }
217 Ok(total)
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use crate::spill::{get_record_batch_memory_size, spill_manager::GetSlicedSize};
224 use arrow::datatypes::{DataType, Field, Schema};
225 use arrow::{
226 array::{ArrayRef, StringViewArray},
227 record_batch::RecordBatch,
228 };
229 use datafusion_common::Result;
230 use std::sync::Arc;
231
232 #[test]
233 fn check_sliced_size_for_string_view_array() -> Result<()> {
234 let array_length = 50;
235 let short_len = 8;
236 let long_len = 25;
237
238 let strings: Vec<String> = (0..array_length)
240 .map(|i| {
241 if i % 2 == 0 {
242 "a".repeat(short_len)
243 } else {
244 "b".repeat(long_len)
245 }
246 })
247 .collect();
248
249 let string_array = StringViewArray::from(strings);
250 let array_ref: ArrayRef = Arc::new(string_array);
251 let batch = RecordBatch::try_new(
252 Arc::new(Schema::new(vec![Field::new(
253 "strings",
254 DataType::Utf8View,
255 false,
256 )])),
257 vec![array_ref],
258 )
259 .unwrap();
260
261 assert_eq!(
263 batch.get_sliced_size().unwrap(),
264 get_record_batch_memory_size(&batch)
265 );
266
267 let half_batch = batch.slice(0, array_length / 2);
269 assert!(
271 half_batch.get_sliced_size().unwrap()
272 < get_record_batch_memory_size(&half_batch)
273 );
274 let data = arrow::array::Array::to_data(&half_batch.column(0));
275 let views_sliced_size = data.get_slice_memory_size()?;
276 assert!(views_sliced_size < half_batch.get_sliced_size().unwrap());
278
279 Ok(())
280 }
281}