1use datafusion_common::{
21 config_err, resources_datafusion_err, resources_err, DataFusionError, Result,
22};
23use log::debug;
24use parking_lot::Mutex;
25use rand::{rng, Rng};
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tempfile::{Builder, NamedTempFile, TempDir};
30
31use crate::memory_pool::human_readable_size;
32
33const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; #[derive(Clone, Debug)]
37pub struct DiskManagerBuilder {
38 mode: DiskManagerMode,
40 max_temp_directory_size: u64,
43}
44
45impl Default for DiskManagerBuilder {
46 fn default() -> Self {
47 Self {
48 mode: DiskManagerMode::OsTmpDirectory,
49 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
50 }
51 }
52}
53
54impl DiskManagerBuilder {
55 pub fn set_mode(&mut self, mode: DiskManagerMode) {
56 self.mode = mode;
57 }
58
59 pub fn with_mode(mut self, mode: DiskManagerMode) -> Self {
60 self.set_mode(mode);
61 self
62 }
63
64 pub fn set_max_temp_directory_size(&mut self, value: u64) {
65 self.max_temp_directory_size = value;
66 }
67
68 pub fn with_max_temp_directory_size(mut self, value: u64) -> Self {
69 self.set_max_temp_directory_size(value);
70 self
71 }
72
73 pub fn build(self) -> Result<DiskManager> {
75 match self.mode {
76 DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
77 local_dirs: Mutex::new(Some(vec![])),
78 max_temp_directory_size: self.max_temp_directory_size,
79 used_disk_space: Arc::new(AtomicU64::new(0)),
80 }),
81 DiskManagerMode::Directories(conf_dirs) => {
82 let local_dirs = create_local_dirs(conf_dirs)?;
83 debug!(
84 "Created local dirs {local_dirs:?} as DataFusion working directory"
85 );
86 Ok(DiskManager {
87 local_dirs: Mutex::new(Some(local_dirs)),
88 max_temp_directory_size: self.max_temp_directory_size,
89 used_disk_space: Arc::new(AtomicU64::new(0)),
90 })
91 }
92 DiskManagerMode::Disabled => Ok(DiskManager {
93 local_dirs: Mutex::new(None),
94 max_temp_directory_size: self.max_temp_directory_size,
95 used_disk_space: Arc::new(AtomicU64::new(0)),
96 }),
97 }
98 }
99}
100
101#[derive(Clone, Debug, Default)]
102pub enum DiskManagerMode {
103 #[default]
106 OsTmpDirectory,
107
108 Directories(Vec<PathBuf>),
112
113 Disabled,
115}
116
117#[allow(deprecated)]
119#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
120#[derive(Debug, Clone, Default)]
121pub enum DiskManagerConfig {
122 Existing(Arc<DiskManager>),
124
125 #[default]
128 NewOs,
129
130 NewSpecified(Vec<PathBuf>),
133
134 Disabled,
136}
137
138#[allow(deprecated)]
139impl DiskManagerConfig {
140 pub fn new() -> Self {
142 Self::default()
143 }
144
145 pub fn new_existing(existing: Arc<DiskManager>) -> Self {
147 Self::Existing(existing)
148 }
149
150 pub fn new_specified(paths: Vec<PathBuf>) -> Self {
152 Self::NewSpecified(paths)
153 }
154}
155
156#[derive(Debug)]
159pub struct DiskManager {
160 local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
165 max_temp_directory_size: u64,
168 used_disk_space: Arc<AtomicU64>,
171}
172
173impl DiskManager {
174 pub fn builder() -> DiskManagerBuilder {
176 DiskManagerBuilder::default()
177 }
178
179 #[allow(deprecated)]
181 #[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
182 pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
183 match config {
184 DiskManagerConfig::Existing(manager) => Ok(manager),
185 DiskManagerConfig::NewOs => Ok(Arc::new(Self {
186 local_dirs: Mutex::new(Some(vec![])),
187 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
188 used_disk_space: Arc::new(AtomicU64::new(0)),
189 })),
190 DiskManagerConfig::NewSpecified(conf_dirs) => {
191 let local_dirs = create_local_dirs(conf_dirs)?;
192 debug!(
193 "Created local dirs {local_dirs:?} as DataFusion working directory"
194 );
195 Ok(Arc::new(Self {
196 local_dirs: Mutex::new(Some(local_dirs)),
197 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
198 used_disk_space: Arc::new(AtomicU64::new(0)),
199 }))
200 }
201 DiskManagerConfig::Disabled => Ok(Arc::new(Self {
202 local_dirs: Mutex::new(None),
203 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
204 used_disk_space: Arc::new(AtomicU64::new(0)),
205 })),
206 }
207 }
208
209 pub fn set_max_temp_directory_size(
210 &mut self,
211 max_temp_directory_size: u64,
212 ) -> Result<()> {
213 if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
216 return config_err!(
217 "Cannot set max temp directory size for a disk manager that spilling is disabled"
218 );
219 }
220
221 self.max_temp_directory_size = max_temp_directory_size;
222 Ok(())
223 }
224
225 pub fn set_arc_max_temp_directory_size(
226 this: &mut Arc<Self>,
227 max_temp_directory_size: u64,
228 ) -> Result<()> {
229 if let Some(inner) = Arc::get_mut(this) {
230 inner.set_max_temp_directory_size(max_temp_directory_size)?;
231 Ok(())
232 } else {
233 config_err!("DiskManager should be a single instance")
234 }
235 }
236
237 pub fn with_max_temp_directory_size(
238 mut self,
239 max_temp_directory_size: u64,
240 ) -> Result<Self> {
241 self.set_max_temp_directory_size(max_temp_directory_size)?;
242 Ok(self)
243 }
244
245 pub fn used_disk_space(&self) -> u64 {
246 self.used_disk_space.load(Ordering::Relaxed)
247 }
248
249 pub fn tmp_files_enabled(&self) -> bool {
253 self.local_dirs.lock().is_some()
254 }
255
256 pub fn create_tmp_file(
261 self: &Arc<Self>,
262 request_description: &str,
263 ) -> Result<RefCountedTempFile> {
264 let mut guard = self.local_dirs.lock();
265 let local_dirs = guard.as_mut().ok_or_else(|| {
266 resources_datafusion_err!(
267 "Memory Exhausted while {request_description} (DiskManager is disabled)"
268 )
269 })?;
270
271 if local_dirs.is_empty() {
273 let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
274
275 debug!(
276 "Created directory '{:?}' as DataFusion tempfile directory for {}",
277 tempdir.path().to_string_lossy(),
278 request_description,
279 );
280
281 local_dirs.push(Arc::new(tempdir));
282 }
283
284 let dir_index = rng().random_range(0..local_dirs.len());
285 Ok(RefCountedTempFile {
286 _parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
287 tempfile: Builder::new()
288 .tempfile_in(local_dirs[dir_index].as_ref())
289 .map_err(DataFusionError::IoError)?,
290 current_file_disk_usage: 0,
291 disk_manager: Arc::clone(self),
292 })
293 }
294}
295
296#[derive(Debug)]
305pub struct RefCountedTempFile {
306 _parent_temp_dir: Arc<TempDir>,
309 tempfile: NamedTempFile,
310 current_file_disk_usage: u64,
313 disk_manager: Arc<DiskManager>,
315}
316
317impl RefCountedTempFile {
318 pub fn path(&self) -> &Path {
319 self.tempfile.path()
320 }
321
322 pub fn inner(&self) -> &NamedTempFile {
323 &self.tempfile
324 }
325
326 pub fn update_disk_usage(&mut self) -> Result<()> {
331 let metadata = self.tempfile.as_file().metadata()?;
333 let new_disk_usage = metadata.len();
334
335 self.disk_manager
338 .used_disk_space
339 .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
340 self.disk_manager
342 .used_disk_space
343 .fetch_add(new_disk_usage, Ordering::Relaxed);
344
345 let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
347 if global_disk_usage > self.disk_manager.max_temp_directory_size {
348 return resources_err!(
349 "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
350 human_readable_size(self.disk_manager.max_temp_directory_size as usize)
351 );
352 }
353
354 self.current_file_disk_usage = new_disk_usage;
356
357 Ok(())
358 }
359
360 pub fn current_disk_usage(&self) -> u64 {
361 self.current_file_disk_usage
362 }
363}
364
365impl Drop for RefCountedTempFile {
367 fn drop(&mut self) {
368 self.disk_manager
370 .used_disk_space
371 .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
372 }
373}
374
375fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
377 local_dirs
378 .iter()
379 .map(|root| {
380 if !Path::new(root).exists() {
381 std::fs::create_dir(root)?;
382 }
383 Builder::new()
384 .prefix("datafusion-")
385 .tempdir_in(root)
386 .map_err(DataFusionError::IoError)
387 })
388 .map(|result| result.map(Arc::new))
389 .collect()
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395
396 #[test]
397 fn lazy_temp_dir_creation() -> Result<()> {
398 let dm = Arc::new(DiskManagerBuilder::default().build()?);
400
401 assert_eq!(0, local_dir_snapshot(&dm).len());
402
403 let actual = dm.create_tmp_file("Testing")?;
405
406 assert_eq!(1, local_dir_snapshot(&dm).len());
408
409 let local_dirs = local_dir_snapshot(&dm);
411 assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
412
413 Ok(())
414 }
415
416 fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
417 dm.local_dirs
418 .lock()
419 .iter()
420 .flatten()
421 .map(|p| p.path().into())
422 .collect()
423 }
424
425 #[test]
426 fn file_in_right_dir() -> Result<()> {
427 let local_dir1 = TempDir::new()?;
428 let local_dir2 = TempDir::new()?;
429 let local_dir3 = TempDir::new()?;
430 let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
431 let dm = Arc::new(
432 DiskManagerBuilder::default()
433 .with_mode(DiskManagerMode::Directories(
434 local_dirs.iter().map(|p| p.into()).collect(),
435 ))
436 .build()?,
437 );
438
439 assert!(dm.tmp_files_enabled());
440 let actual = dm.create_tmp_file("Testing")?;
441
442 assert_path_in_dirs(actual.path(), local_dirs.into_iter());
444
445 Ok(())
446 }
447
448 #[test]
449 fn test_disabled_disk_manager() {
450 let manager = Arc::new(
451 DiskManagerBuilder::default()
452 .with_mode(DiskManagerMode::Disabled)
453 .build()
454 .unwrap(),
455 );
456 assert!(!manager.tmp_files_enabled());
457 assert_eq!(
458 manager.create_tmp_file("Testing").unwrap_err().strip_backtrace(),
459 "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
460 )
461 }
462
463 #[test]
464 fn test_disk_manager_create_spill_folder() {
465 let dir = TempDir::new().unwrap();
466 DiskManagerBuilder::default()
467 .with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()]))
468 .build()
469 .unwrap();
470 }
471
472 fn assert_path_in_dirs<'a>(
474 file_path: &'a Path,
475 dirs: impl Iterator<Item = &'a Path>,
476 ) {
477 let dirs: Vec<&Path> = dirs.collect();
478
479 let found = dirs.iter().any(|dir_path| {
480 file_path
481 .ancestors()
482 .any(|candidate_path| *dir_path == candidate_path)
483 });
484
485 assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
486 }
487
488 #[test]
489 fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
490 let dm = Arc::new(DiskManagerBuilder::default().build()?);
492 let temp_file = dm.create_tmp_file("Testing")?;
493 let temp_file_path = temp_file.path().to_owned();
494 assert!(temp_file_path.exists());
495
496 drop(dm);
497 assert!(temp_file_path.exists());
498
499 drop(temp_file);
500 assert!(!temp_file_path.exists());
501
502 let local_dir1 = TempDir::new()?;
504 let local_dir2 = TempDir::new()?;
505 let local_dir3 = TempDir::new()?;
506 let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
507 let dm = Arc::new(
508 DiskManagerBuilder::default()
509 .with_mode(DiskManagerMode::Directories(
510 local_dirs.iter().map(|p| p.into()).collect(),
511 ))
512 .build()?,
513 );
514 let temp_file = dm.create_tmp_file("Testing")?;
515 let temp_file_path = temp_file.path().to_owned();
516 assert!(temp_file_path.exists());
517
518 drop(dm);
519 assert!(temp_file_path.exists());
520
521 drop(temp_file);
522 assert!(!temp_file_path.exists());
523
524 Ok(())
525 }
526}