datafusion_execution/
disk_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DiskManager`]: Manages files generated during query execution
19
20use datafusion_common::{
21    config_err, resources_datafusion_err, resources_err, DataFusionError, Result,
22};
23use log::debug;
24use parking_lot::Mutex;
25use rand::{rng, Rng};
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tempfile::{Builder, NamedTempFile, TempDir};
30
31use crate::memory_pool::human_readable_size;
32
33const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB
34
35/// Builder pattern for the [DiskManager] structure
36#[derive(Clone, Debug)]
37pub struct DiskManagerBuilder {
38    /// The storage mode of the disk manager
39    mode: DiskManagerMode,
40    /// The maximum amount of data (in bytes) stored inside the temporary directories.
41    /// Default to 100GB
42    max_temp_directory_size: u64,
43}
44
45impl Default for DiskManagerBuilder {
46    fn default() -> Self {
47        Self {
48            mode: DiskManagerMode::OsTmpDirectory,
49            max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
50        }
51    }
52}
53
54impl DiskManagerBuilder {
55    pub fn set_mode(&mut self, mode: DiskManagerMode) {
56        self.mode = mode;
57    }
58
59    pub fn with_mode(mut self, mode: DiskManagerMode) -> Self {
60        self.set_mode(mode);
61        self
62    }
63
64    pub fn set_max_temp_directory_size(&mut self, value: u64) {
65        self.max_temp_directory_size = value;
66    }
67
68    pub fn with_max_temp_directory_size(mut self, value: u64) -> Self {
69        self.set_max_temp_directory_size(value);
70        self
71    }
72
73    /// Create a DiskManager given the builder
74    pub fn build(self) -> Result<DiskManager> {
75        match self.mode {
76            DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
77                local_dirs: Mutex::new(Some(vec![])),
78                max_temp_directory_size: self.max_temp_directory_size,
79                used_disk_space: Arc::new(AtomicU64::new(0)),
80            }),
81            DiskManagerMode::Directories(conf_dirs) => {
82                let local_dirs = create_local_dirs(conf_dirs)?;
83                debug!(
84                    "Created local dirs {local_dirs:?} as DataFusion working directory"
85                );
86                Ok(DiskManager {
87                    local_dirs: Mutex::new(Some(local_dirs)),
88                    max_temp_directory_size: self.max_temp_directory_size,
89                    used_disk_space: Arc::new(AtomicU64::new(0)),
90                })
91            }
92            DiskManagerMode::Disabled => Ok(DiskManager {
93                local_dirs: Mutex::new(None),
94                max_temp_directory_size: self.max_temp_directory_size,
95                used_disk_space: Arc::new(AtomicU64::new(0)),
96            }),
97        }
98    }
99}
100
101#[derive(Clone, Debug, Default)]
102pub enum DiskManagerMode {
103    /// Create a new [DiskManager] that creates temporary files within
104    /// a temporary directory chosen by the OS
105    #[default]
106    OsTmpDirectory,
107
108    /// Create a new [DiskManager] that creates temporary files within
109    /// the specified directories. One of the directories will be chosen
110    /// at random for each temporary file created.
111    Directories(Vec<PathBuf>),
112
113    /// Disable disk manager, attempts to create temporary files will error
114    Disabled,
115}
116
117/// Configuration for temporary disk access
118#[allow(deprecated)]
119#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
120#[derive(Debug, Clone, Default)]
121pub enum DiskManagerConfig {
122    /// Use the provided [DiskManager] instance
123    Existing(Arc<DiskManager>),
124
125    /// Create a new [DiskManager] that creates temporary files within
126    /// a temporary directory chosen by the OS
127    #[default]
128    NewOs,
129
130    /// Create a new [DiskManager] that creates temporary files within
131    /// the specified directories
132    NewSpecified(Vec<PathBuf>),
133
134    /// Disable disk manager, attempts to create temporary files will error
135    Disabled,
136}
137
138#[allow(deprecated)]
139impl DiskManagerConfig {
140    /// Create temporary files in a temporary directory chosen by the OS
141    pub fn new() -> Self {
142        Self::default()
143    }
144
145    /// Create temporary files using the provided disk manager
146    pub fn new_existing(existing: Arc<DiskManager>) -> Self {
147        Self::Existing(existing)
148    }
149
150    /// Create temporary files in the specified directories
151    pub fn new_specified(paths: Vec<PathBuf>) -> Self {
152        Self::NewSpecified(paths)
153    }
154}
155
156/// Manages files generated during query execution, e.g. spill files generated
157/// while processing dataset larger than available memory.
158#[derive(Debug)]
159pub struct DiskManager {
160    /// TempDirs to put temporary files in.
161    ///
162    /// If `Some(vec![])` a new OS specified temporary directory will be created
163    /// If `None` an error will be returned (configured not to spill)
164    local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
165    /// The maximum amount of data (in bytes) stored inside the temporary directories.
166    /// Default to 100GB
167    max_temp_directory_size: u64,
168    /// Used disk space in the temporary directories. Now only spilled data for
169    /// external executors are counted.
170    used_disk_space: Arc<AtomicU64>,
171}
172
173impl DiskManager {
174    /// Creates a builder for [DiskManager]
175    pub fn builder() -> DiskManagerBuilder {
176        DiskManagerBuilder::default()
177    }
178
179    /// Create a DiskManager given the configuration
180    #[allow(deprecated)]
181    #[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
182    pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
183        match config {
184            DiskManagerConfig::Existing(manager) => Ok(manager),
185            DiskManagerConfig::NewOs => Ok(Arc::new(Self {
186                local_dirs: Mutex::new(Some(vec![])),
187                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
188                used_disk_space: Arc::new(AtomicU64::new(0)),
189            })),
190            DiskManagerConfig::NewSpecified(conf_dirs) => {
191                let local_dirs = create_local_dirs(conf_dirs)?;
192                debug!(
193                    "Created local dirs {local_dirs:?} as DataFusion working directory"
194                );
195                Ok(Arc::new(Self {
196                    local_dirs: Mutex::new(Some(local_dirs)),
197                    max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
198                    used_disk_space: Arc::new(AtomicU64::new(0)),
199                }))
200            }
201            DiskManagerConfig::Disabled => Ok(Arc::new(Self {
202                local_dirs: Mutex::new(None),
203                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
204                used_disk_space: Arc::new(AtomicU64::new(0)),
205            })),
206        }
207    }
208
209    pub fn set_max_temp_directory_size(
210        &mut self,
211        max_temp_directory_size: u64,
212    ) -> Result<()> {
213        // If the disk manager is disabled and `max_temp_directory_size` is not 0,
214        // this operation is not meaningful, fail early.
215        if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
216            return config_err!(
217                "Cannot set max temp directory size for a disk manager that spilling is disabled"
218            );
219        }
220
221        self.max_temp_directory_size = max_temp_directory_size;
222        Ok(())
223    }
224
225    pub fn set_arc_max_temp_directory_size(
226        this: &mut Arc<Self>,
227        max_temp_directory_size: u64,
228    ) -> Result<()> {
229        if let Some(inner) = Arc::get_mut(this) {
230            inner.set_max_temp_directory_size(max_temp_directory_size)?;
231            Ok(())
232        } else {
233            config_err!("DiskManager should be a single instance")
234        }
235    }
236
237    pub fn with_max_temp_directory_size(
238        mut self,
239        max_temp_directory_size: u64,
240    ) -> Result<Self> {
241        self.set_max_temp_directory_size(max_temp_directory_size)?;
242        Ok(self)
243    }
244
245    pub fn used_disk_space(&self) -> u64 {
246        self.used_disk_space.load(Ordering::Relaxed)
247    }
248
249    /// Return true if this disk manager supports creating temporary
250    /// files. If this returns false, any call to `create_tmp_file`
251    /// will error.
252    pub fn tmp_files_enabled(&self) -> bool {
253        self.local_dirs.lock().is_some()
254    }
255
256    /// Return a temporary file from a randomized choice in the configured locations
257    ///
258    /// If the file can not be created for some reason, returns an
259    /// error message referencing the request description
260    pub fn create_tmp_file(
261        self: &Arc<Self>,
262        request_description: &str,
263    ) -> Result<RefCountedTempFile> {
264        let mut guard = self.local_dirs.lock();
265        let local_dirs = guard.as_mut().ok_or_else(|| {
266            resources_datafusion_err!(
267                "Memory Exhausted while {request_description} (DiskManager is disabled)"
268            )
269        })?;
270
271        // Create a temporary directory if needed
272        if local_dirs.is_empty() {
273            let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
274
275            debug!(
276                "Created directory '{:?}' as DataFusion tempfile directory for {}",
277                tempdir.path().to_string_lossy(),
278                request_description,
279            );
280
281            local_dirs.push(Arc::new(tempdir));
282        }
283
284        let dir_index = rng().random_range(0..local_dirs.len());
285        Ok(RefCountedTempFile {
286            _parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
287            tempfile: Builder::new()
288                .tempfile_in(local_dirs[dir_index].as_ref())
289                .map_err(DataFusionError::IoError)?,
290            current_file_disk_usage: 0,
291            disk_manager: Arc::clone(self),
292        })
293    }
294}
295
296/// A wrapper around a [`NamedTempFile`] that also contains
297/// a reference to its parent temporary directory.
298///
299/// # Note
300/// After any modification to the underlying file (e.g., writing data to it), the caller
301/// must invoke [`Self::update_disk_usage`] to update the global disk usage counter.
302/// This ensures the disk manager can properly enforce usage limits configured by
303/// [`DiskManager::with_max_temp_directory_size`].
304#[derive(Debug)]
305pub struct RefCountedTempFile {
306    /// The reference to the directory in which temporary files are created to ensure
307    /// it is not cleaned up prior to the NamedTempFile
308    _parent_temp_dir: Arc<TempDir>,
309    tempfile: NamedTempFile,
310    /// Tracks the current disk usage of this temporary file. See
311    /// [`Self::update_disk_usage`] for more details.
312    current_file_disk_usage: u64,
313    /// The disk manager that created and manages this temporary file
314    disk_manager: Arc<DiskManager>,
315}
316
317impl RefCountedTempFile {
318    pub fn path(&self) -> &Path {
319        self.tempfile.path()
320    }
321
322    pub fn inner(&self) -> &NamedTempFile {
323        &self.tempfile
324    }
325
326    /// Updates the global disk usage counter after modifications to the underlying file.
327    ///
328    /// # Errors
329    /// - Returns an error if the global disk usage exceeds the configured limit.
330    pub fn update_disk_usage(&mut self) -> Result<()> {
331        // Get new file size from OS
332        let metadata = self.tempfile.as_file().metadata()?;
333        let new_disk_usage = metadata.len();
334
335        // Update the global disk usage by:
336        // 1. Subtracting the old file size from the global counter
337        self.disk_manager
338            .used_disk_space
339            .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
340        // 2. Adding the new file size to the global counter
341        self.disk_manager
342            .used_disk_space
343            .fetch_add(new_disk_usage, Ordering::Relaxed);
344
345        // 3. Check if the updated global disk usage exceeds the configured limit
346        let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
347        if global_disk_usage > self.disk_manager.max_temp_directory_size {
348            return resources_err!(
349                "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
350                human_readable_size(self.disk_manager.max_temp_directory_size as usize)
351            );
352        }
353
354        // 4. Update the local file size tracking
355        self.current_file_disk_usage = new_disk_usage;
356
357        Ok(())
358    }
359
360    pub fn current_disk_usage(&self) -> u64 {
361        self.current_file_disk_usage
362    }
363}
364
365/// When the temporary file is dropped, subtract its disk usage from the disk manager's total
366impl Drop for RefCountedTempFile {
367    fn drop(&mut self) {
368        // Subtract the current file's disk usage from the global counter
369        self.disk_manager
370            .used_disk_space
371            .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
372    }
373}
374
375/// Setup local dirs by creating one new dir in each of the given dirs
376fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
377    local_dirs
378        .iter()
379        .map(|root| {
380            if !Path::new(root).exists() {
381                std::fs::create_dir(root)?;
382            }
383            Builder::new()
384                .prefix("datafusion-")
385                .tempdir_in(root)
386                .map_err(DataFusionError::IoError)
387        })
388        .map(|result| result.map(Arc::new))
389        .collect()
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395
396    #[test]
397    fn lazy_temp_dir_creation() -> Result<()> {
398        // A default configuration should not create temp files until requested
399        let dm = Arc::new(DiskManagerBuilder::default().build()?);
400
401        assert_eq!(0, local_dir_snapshot(&dm).len());
402
403        // can still create a tempfile however:
404        let actual = dm.create_tmp_file("Testing")?;
405
406        // Now the tempdir has been created on demand
407        assert_eq!(1, local_dir_snapshot(&dm).len());
408
409        // the returned tempfile file should be in the temp directory
410        let local_dirs = local_dir_snapshot(&dm);
411        assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
412
413        Ok(())
414    }
415
416    fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
417        dm.local_dirs
418            .lock()
419            .iter()
420            .flatten()
421            .map(|p| p.path().into())
422            .collect()
423    }
424
425    #[test]
426    fn file_in_right_dir() -> Result<()> {
427        let local_dir1 = TempDir::new()?;
428        let local_dir2 = TempDir::new()?;
429        let local_dir3 = TempDir::new()?;
430        let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
431        let dm = Arc::new(
432            DiskManagerBuilder::default()
433                .with_mode(DiskManagerMode::Directories(
434                    local_dirs.iter().map(|p| p.into()).collect(),
435                ))
436                .build()?,
437        );
438
439        assert!(dm.tmp_files_enabled());
440        let actual = dm.create_tmp_file("Testing")?;
441
442        // the file should be in one of the specified local directories
443        assert_path_in_dirs(actual.path(), local_dirs.into_iter());
444
445        Ok(())
446    }
447
448    #[test]
449    fn test_disabled_disk_manager() {
450        let manager = Arc::new(
451            DiskManagerBuilder::default()
452                .with_mode(DiskManagerMode::Disabled)
453                .build()
454                .unwrap(),
455        );
456        assert!(!manager.tmp_files_enabled());
457        assert_eq!(
458            manager.create_tmp_file("Testing").unwrap_err().strip_backtrace(),
459            "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
460        )
461    }
462
463    #[test]
464    fn test_disk_manager_create_spill_folder() {
465        let dir = TempDir::new().unwrap();
466        DiskManagerBuilder::default()
467            .with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()]))
468            .build()
469            .unwrap();
470    }
471
472    /// Asserts that `file_path` is found anywhere in any of `dir` directories
473    fn assert_path_in_dirs<'a>(
474        file_path: &'a Path,
475        dirs: impl Iterator<Item = &'a Path>,
476    ) {
477        let dirs: Vec<&Path> = dirs.collect();
478
479        let found = dirs.iter().any(|dir_path| {
480            file_path
481                .ancestors()
482                .any(|candidate_path| *dir_path == candidate_path)
483        });
484
485        assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
486    }
487
488    #[test]
489    fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
490        // Test for the case using OS arranged temporary directory
491        let dm = Arc::new(DiskManagerBuilder::default().build()?);
492        let temp_file = dm.create_tmp_file("Testing")?;
493        let temp_file_path = temp_file.path().to_owned();
494        assert!(temp_file_path.exists());
495
496        drop(dm);
497        assert!(temp_file_path.exists());
498
499        drop(temp_file);
500        assert!(!temp_file_path.exists());
501
502        // Test for the case using specified directories
503        let local_dir1 = TempDir::new()?;
504        let local_dir2 = TempDir::new()?;
505        let local_dir3 = TempDir::new()?;
506        let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
507        let dm = Arc::new(
508            DiskManagerBuilder::default()
509                .with_mode(DiskManagerMode::Directories(
510                    local_dirs.iter().map(|p| p.into()).collect(),
511                ))
512                .build()?,
513        );
514        let temp_file = dm.create_tmp_file("Testing")?;
515        let temp_file_path = temp_file.path().to_owned();
516        assert!(temp_file_path.exists());
517
518        drop(dm);
519        assert!(temp_file_path.exists());
520
521        drop(temp_file);
522        assert!(!temp_file_path.exists());
523
524        Ok(())
525    }
526}