alamb commented on code in PR #14975: URL: https://github.com/apache/datafusion/pull/14975#discussion_r2004211990
########## datafusion/execution/src/disk_manager.rs: ########## @@ -144,6 +219,94 @@ impl DiskManager { .map_err(DataFusionError::IoError)?, }) } + + /// Write record batches to a temporary file, and return the spill file handle. Review Comment: I am not sure about putting this method on the `DiskManager` itself -- I think managing the contents of the spill files is something that will likely get more sophisticated (e.g. if we want to explore using mmap and other techniques to speed up reading) I have been meaning to file tickets about this and I will do so shortly. What would you think about introducing a new struct that would be responsible for managing the spill files for a particular operation Something like this perhaps: ```rust struct SpillFiles { env: Arc<RuntimeEnv> files: Vec<RefCountedTempFile> ... } ... impl SpillFiles { pub fn try_spill_record_batches( &self, batches: &[RecordBatch], request_description: &str, caller_spill_metrics: &mut SpillMetrics, ) -> Result<()> {..} } ``` ########## datafusion/execution/src/disk_manager.rs: ########## @@ -93,14 +115,67 @@ impl DiskManager { ); Ok(Arc::new(Self { local_dirs: Mutex::new(Some(local_dirs)), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Count::default(), })) } DiskManagerConfig::Disabled => Ok(Arc::new(Self { local_dirs: Mutex::new(None), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Count::default(), })), } } + pub fn try_new_without_arc(config: DiskManagerConfig) -> Result<Self> { Review Comment: I don't understand the need for this function and find the name confusing. It seems like the only difference is that it will error if it doesn't have exclusive access to the disk manager. I have an alternate suggestion above ########## datafusion/core/tests/memory_limit/mod.rs: ########## @@ -468,6 +470,83 @@ async fn test_stringview_external_sort() { let _ = df.collect().await.expect("Query execution failed"); } +// Tests for disk limit (`max_temp_directory_size` in `DiskManager`) +// ------------------------------------------------------------------ + +// Create a new `SessionContext` with speicified disk limit and memory pool limit +async fn setup_context( + disk_limit: u64, + memory_pool_limit: usize, +) -> Result<SessionContext> { + let disk_manager = DiskManager::try_new_without_arc(DiskManagerConfig::NewOs)? + .with_max_temp_directory_size(disk_limit)?; + + let runtime = RuntimeEnvBuilder::new() Review Comment: I think you coudl do this iinstead of adding a new function: ```rust let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; let disk_manager = Arc::try_unwrap(disk_manager) .expect("DiskManager should be unique") .with_max_temp_directory_size(disk_limit)?; ``` ########## datafusion/execution/src/disk_manager.rs: ########## @@ -93,14 +115,67 @@ impl DiskManager { ); Ok(Arc::new(Self { local_dirs: Mutex::new(Some(local_dirs)), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Count::default(), })) } DiskManagerConfig::Disabled => Ok(Arc::new(Self { local_dirs: Mutex::new(None), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Count::default(), })), } } + pub fn try_new_without_arc(config: DiskManagerConfig) -> Result<Self> { + match config { + DiskManagerConfig::Existing(manager) => { + Arc::try_unwrap(manager).map_err(|_| { + DataFusionError::Internal("Failed to unwrap Arc".to_string()) + }) + } + DiskManagerConfig::NewOs => Ok(Self { + local_dirs: Mutex::new(Some(vec![])), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Count::default(), + }), + DiskManagerConfig::NewSpecified(conf_dirs) => { + let local_dirs = create_local_dirs(conf_dirs)?; + debug!( + "Created local dirs {:?} as DataFusion working directory", + local_dirs + ); + Ok(Self { + local_dirs: Mutex::new(Some(local_dirs)), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Count::default(), + }) + } + DiskManagerConfig::Disabled => Ok(Self { + local_dirs: Mutex::new(None), + max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + used_disk_space: Count::default(), + }), + } + } + + /// Set the maximum amount of data (in bytes) stored inside the temporary directories. + pub fn with_max_temp_directory_size( Review Comment: I would expect that this function should be on the `DiskManagerConfig` rather than `DiskManager` 🤔 Then you won't have to add the new `try_uwnrap` either -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org