alamb commented on code in PR #14975:
URL: https://github.com/apache/datafusion/pull/14975#discussion_r2004211990


##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -144,6 +219,94 @@ impl DiskManager {
                 .map_err(DataFusionError::IoError)?,
         })
     }
+
+    /// Write record batches to a temporary file, and return the spill file 
handle.

Review Comment:
   I am not sure about putting this method on the `DiskManager` itself -- I 
think managing the contents of the spill files is something that will likely 
get more sophisticated (e.g. if we want to explore using mmap and other 
techniques to speed up reading)
   
   I have been meaning to file tickets about this and I will do so shortly. 
   
   What would you think about introducing a new struct that would be 
responsible for managing the spill files for a particular operation
   
   Something like this perhaps:
   
   ```rust
   struct SpillFiles {
     env: Arc<RuntimeEnv>
     files: Vec<RefCountedTempFile>
   ...
   }
   ...
   
   impl SpillFiles {
       pub fn try_spill_record_batches(
           &self,
           batches: &[RecordBatch],
           request_description: &str,
           caller_spill_metrics: &mut SpillMetrics,
       ) -> Result<()> {..}
   }
   ```
   
   



##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -93,14 +115,67 @@ impl DiskManager {
                 );
                 Ok(Arc::new(Self {
                     local_dirs: Mutex::new(Some(local_dirs)),
+                    max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+                    used_disk_space: Count::default(),
                 }))
             }
             DiskManagerConfig::Disabled => Ok(Arc::new(Self {
                 local_dirs: Mutex::new(None),
+                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+                used_disk_space: Count::default(),
             })),
         }
     }
 
+    pub fn try_new_without_arc(config: DiskManagerConfig) -> Result<Self> {

Review Comment:
   I don't understand the need for this function and find the name confusing. 
It seems like the only difference is that it will error if it doesn't have 
exclusive access to the disk manager. I have an alternate suggestion above



##########
datafusion/core/tests/memory_limit/mod.rs:
##########
@@ -468,6 +470,83 @@ async fn test_stringview_external_sort() {
     let _ = df.collect().await.expect("Query execution failed");
 }
 
+// Tests for disk limit (`max_temp_directory_size` in `DiskManager`)
+// ------------------------------------------------------------------
+
+// Create a new `SessionContext` with speicified disk limit and memory pool 
limit
+async fn setup_context(
+    disk_limit: u64,
+    memory_pool_limit: usize,
+) -> Result<SessionContext> {
+    let disk_manager = 
DiskManager::try_new_without_arc(DiskManagerConfig::NewOs)?
+        .with_max_temp_directory_size(disk_limit)?;
+
+    let runtime = RuntimeEnvBuilder::new()

Review Comment:
   I think you coudl do this iinstead of adding a new function:
   
   ```rust
       let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
       let disk_manager = Arc::try_unwrap(disk_manager)
           .expect("DiskManager should be unique")
           .with_max_temp_directory_size(disk_limit)?;
   ```



##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -93,14 +115,67 @@ impl DiskManager {
                 );
                 Ok(Arc::new(Self {
                     local_dirs: Mutex::new(Some(local_dirs)),
+                    max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+                    used_disk_space: Count::default(),
                 }))
             }
             DiskManagerConfig::Disabled => Ok(Arc::new(Self {
                 local_dirs: Mutex::new(None),
+                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+                used_disk_space: Count::default(),
             })),
         }
     }
 
+    pub fn try_new_without_arc(config: DiskManagerConfig) -> Result<Self> {
+        match config {
+            DiskManagerConfig::Existing(manager) => {
+                Arc::try_unwrap(manager).map_err(|_| {
+                    DataFusionError::Internal("Failed to unwrap 
Arc".to_string())
+                })
+            }
+            DiskManagerConfig::NewOs => Ok(Self {
+                local_dirs: Mutex::new(Some(vec![])),
+                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+                used_disk_space: Count::default(),
+            }),
+            DiskManagerConfig::NewSpecified(conf_dirs) => {
+                let local_dirs = create_local_dirs(conf_dirs)?;
+                debug!(
+                    "Created local dirs {:?} as DataFusion working directory",
+                    local_dirs
+                );
+                Ok(Self {
+                    local_dirs: Mutex::new(Some(local_dirs)),
+                    max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+                    used_disk_space: Count::default(),
+                })
+            }
+            DiskManagerConfig::Disabled => Ok(Self {
+                local_dirs: Mutex::new(None),
+                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+                used_disk_space: Count::default(),
+            }),
+        }
+    }
+
+    /// Set the maximum amount of data (in bytes) stored inside the temporary 
directories.
+    pub fn with_max_temp_directory_size(

Review Comment:
   I would expect that this function should be on the `DiskManagerConfig` 
rather than `DiskManager` 🤔  
   
   Then you won't have to add the new `try_uwnrap` either



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to