datafusion_physical_plan/spill/
in_progress_spill_file.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Define the `InProgressSpillFile` struct, which represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
19
20use datafusion_common::Result;
21use std::sync::Arc;
22
23use arrow::array::RecordBatch;
24use datafusion_common::exec_datafusion_err;
25use datafusion_execution::disk_manager::RefCountedTempFile;
26
27use super::{spill_manager::SpillManager, IPCStreamWriter};
28
29/// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
30/// Caller is able to use this struct to incrementally append in-memory batches to
31/// the file, and then finalize the file by calling the `finish` method.
32pub struct InProgressSpillFile {
33    pub(crate) spill_writer: Arc<SpillManager>,
34    /// Lazily initialized writer
35    writer: Option<IPCStreamWriter>,
36    /// Lazily initialized in-progress file, it will be moved out when the `finish` method is invoked
37    in_progress_file: Option<RefCountedTempFile>,
38}
39
40impl InProgressSpillFile {
41    pub fn new(
42        spill_writer: Arc<SpillManager>,
43        in_progress_file: RefCountedTempFile,
44    ) -> Self {
45        Self {
46            spill_writer,
47            in_progress_file: Some(in_progress_file),
48            writer: None,
49        }
50    }
51
52    /// Appends a `RecordBatch` to the spill file, initializing the writer if necessary.
53    ///
54    /// # Errors
55    /// - Returns an error if the file is not active (has been finalized)
56    /// - Returns an error if appending would exceed the disk usage limit configured
57    ///   by `max_temp_directory_size` in `DiskManager`
58    pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> {
59        if self.in_progress_file.is_none() {
60            return Err(exec_datafusion_err!(
61                "Append operation failed: No active in-progress file. The file may have already been finalized."
62            ));
63        }
64        if self.writer.is_none() {
65            let schema = batch.schema();
66            if let Some(ref in_progress_file) = self.in_progress_file {
67                self.writer = Some(IPCStreamWriter::new(
68                    in_progress_file.path(),
69                    schema.as_ref(),
70                    self.spill_writer.compression,
71                )?);
72
73                // Update metrics
74                self.spill_writer.metrics.spill_file_count.add(1);
75            }
76        }
77        if let Some(writer) = &mut self.writer {
78            let (spilled_rows, _) = writer.write(batch)?;
79            if let Some(in_progress_file) = &mut self.in_progress_file {
80                in_progress_file.update_disk_usage()?;
81            } else {
82                unreachable!() // Already checked inside current function
83            }
84
85            // Update metrics
86            self.spill_writer.metrics.spilled_rows.add(spilled_rows);
87        }
88        Ok(())
89    }
90
91    /// Finalizes the file, returning the completed file reference.
92    /// If there are no batches spilled before, it returns `None`.
93    pub fn finish(&mut self) -> Result<Option<RefCountedTempFile>> {
94        if let Some(writer) = &mut self.writer {
95            writer.finish()?;
96        } else {
97            return Ok(None);
98        }
99
100        // Since spill files are append-only, add the file size to spilled_bytes
101        if let Some(in_progress_file) = &mut self.in_progress_file {
102            // Since writer.finish() writes continuation marker and message length at the end
103            in_progress_file.update_disk_usage()?;
104            let size = in_progress_file.current_disk_usage();
105            self.spill_writer.metrics.spilled_bytes.add(size as usize);
106        }
107
108        Ok(self.in_progress_file.take())
109    }
110}