datafusion_physical_plan/spill/in_progress_spill_file.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Define the `InProgressSpillFile` struct, which represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
19
20use datafusion_common::Result;
21use std::sync::Arc;
22
23use arrow::array::RecordBatch;
24use datafusion_common::exec_datafusion_err;
25use datafusion_execution::disk_manager::RefCountedTempFile;
26
27use super::{spill_manager::SpillManager, IPCStreamWriter};
28
29/// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
30/// Caller is able to use this struct to incrementally append in-memory batches to
31/// the file, and then finalize the file by calling the `finish` method.
32pub struct InProgressSpillFile {
33 pub(crate) spill_writer: Arc<SpillManager>,
34 /// Lazily initialized writer
35 writer: Option<IPCStreamWriter>,
36 /// Lazily initialized in-progress file, it will be moved out when the `finish` method is invoked
37 in_progress_file: Option<RefCountedTempFile>,
38}
39
40impl InProgressSpillFile {
41 pub fn new(
42 spill_writer: Arc<SpillManager>,
43 in_progress_file: RefCountedTempFile,
44 ) -> Self {
45 Self {
46 spill_writer,
47 in_progress_file: Some(in_progress_file),
48 writer: None,
49 }
50 }
51
52 /// Appends a `RecordBatch` to the spill file, initializing the writer if necessary.
53 ///
54 /// # Errors
55 /// - Returns an error if the file is not active (has been finalized)
56 /// - Returns an error if appending would exceed the disk usage limit configured
57 /// by `max_temp_directory_size` in `DiskManager`
58 pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> {
59 if self.in_progress_file.is_none() {
60 return Err(exec_datafusion_err!(
61 "Append operation failed: No active in-progress file. The file may have already been finalized."
62 ));
63 }
64 if self.writer.is_none() {
65 let schema = batch.schema();
66 if let Some(ref in_progress_file) = self.in_progress_file {
67 self.writer = Some(IPCStreamWriter::new(
68 in_progress_file.path(),
69 schema.as_ref(),
70 self.spill_writer.compression,
71 )?);
72
73 // Update metrics
74 self.spill_writer.metrics.spill_file_count.add(1);
75 }
76 }
77 if let Some(writer) = &mut self.writer {
78 let (spilled_rows, _) = writer.write(batch)?;
79 if let Some(in_progress_file) = &mut self.in_progress_file {
80 in_progress_file.update_disk_usage()?;
81 } else {
82 unreachable!() // Already checked inside current function
83 }
84
85 // Update metrics
86 self.spill_writer.metrics.spilled_rows.add(spilled_rows);
87 }
88 Ok(())
89 }
90
91 /// Finalizes the file, returning the completed file reference.
92 /// If there are no batches spilled before, it returns `None`.
93 pub fn finish(&mut self) -> Result<Option<RefCountedTempFile>> {
94 if let Some(writer) = &mut self.writer {
95 writer.finish()?;
96 } else {
97 return Ok(None);
98 }
99
100 // Since spill files are append-only, add the file size to spilled_bytes
101 if let Some(in_progress_file) = &mut self.in_progress_file {
102 // Since writer.finish() writes continuation marker and message length at the end
103 in_progress_file.update_disk_usage()?;
104 let size = in_progress_file.current_disk_usage();
105 self.spill_writer.metrics.spilled_bytes.add(size as usize);
106 }
107
108 Ok(self.in_progress_file.take())
109 }
110}