andygrove opened a new pull request, #3195:
URL: https://github.com/apache/datafusion-comet/pull/3195

   This PR addresses several concurrency and resource management issues in the 
async shuffle external sorter that could cause CPU waste, memory leaks, and 
improper cleanup during error conditions.                   
   
   Builds on https://github.com/apache/datafusion-comet/pull/3192
                                                                                
                                                                                
                                                       
   ## Changes                                                                   
                                                                                
                                                          
                                                                                
                                                                                
                                                       
   1. Replace busy-wait loop with proper blocking                               
                                                                                
                                                       
                                                                                
                                                                                
                                                       
   Problem: When the maximum number of concurrent spill tasks was reached, the 
code used a CPU-burning spin loop to check if any task completed.               
                                                        
                                                                                
                                                                                
                                                       
   Before:                                                                      
                                                                                
                                                       
   while (asyncSpillTasks.size() == threadNum) {                                
                                                                                
                                                       
       for (Future<Void> spillingTask : asyncSpillTasks) {                      
                                                                                
                                                       
           if (spillingTask.isDone()) {                                         
                                                                                
                                                       
               asyncSpillTasks.remove(spillingTask);                            
                                                                                
                                                       
               break;                                                           
                                                                                
                                                       
           }                                                                    
                                                                                
                                                       
       }                                                                        
                                                                                
                                                       
   }                                                                            
                                                                                
                                                       
                                                                                
                                                                                
                                                       
   After: Block on the oldest task using Future.get(), which properly yields 
the CPU while waiting.                                                          
                                                          
                                                                                
                                                                                
                                                       
   2. Fix exception handling in closeAndGetSpills()                             
                                                                                
                                                       
                                                                                
                                                                                
                                                       
   Problem: If one async task threw an exception, subsequent tasks were not 
awaited, potentially leaving background threads running and resources 
unreleased.                                                          
                                                                                
                                                                                
                                                       
   After: Wait for ALL tasks to complete, collecting exceptions using 
addSuppressed() to preserve error information from multiple failures.           
                                                                 
                                                                                
                                                                                
                                                       
   3. Add runtime validation for thread pool initialization                     
                                                                                
                                                       
                                                                                
                                                                                
                                                       
   Problem: Used assert for validating threadNum > 0 (disabled in production) 
and no null check for thread pool.                                              
                                                         
                                                                                
                                                                                
                                                       
   After: Proper runtime checks with descriptive error messages:                
                                                                                
                                                       
   - IllegalArgumentException if threadNum <= 0                                 
                                                                                
                                                       
   - IllegalStateException if thread pool is null                               
                                                                                
                                                       
                                                                                
                                                                                
                                                       
   4. Fix memory leak on exception in async spill task                          
                                                                                
                                                       
                                                                                
                                                                                
                                                       
   Problem: If writeSortedFileNative() threw an exception, the SpillSorter 
remained in the spillingSorters queue with its memory unreleased.               
                                                            
                                                                                
                                                                                
                                                       
   After: Wrap in try-finally to ensure freeMemory(), freeArray(), and 
spillingSorters.remove() always execute.                                        
                                                                
                                                                                
                                                                                
                                                       
   5. Add cancellation support in cleanupResources()                            
                                                                                
                                                       
                                                                                
                                                                                
                                                       
   Problem: When a task was killed or aborted, background spill threads 
continued running with no way to stop them.                                     
                                                               
                                                                                
                                                                                
                                                       
   After: Cancel all pending async tasks and wait briefly for their cleanup to 
complete before freeing memory and deleting spill files.                        
                                                        
                                                                                
                                                                                
                                                       
   6. Make peakMemoryUsedBytes volatile                                         
                                                                                
                                                       
                                                                                
                                                                                
                                                       
   Problem: Field could have stale reads when accessed from different threads 
(main thread vs background spill threads).                                      
                                                         
                                                                                
                                                                                
                                                       
   After: Added volatile modifier to ensure visibility across threads.          
                                                                                
                                                       
                                                                                
                                                                                
                                                       
   ## Test Plan
                                                                                
                                                                                
                                                       
   - Existing shuffle tests pass                                                
                                                                                
                                                       
   - Async shuffle tests pass with 
spark.comet.columnar.shuffle.async.enabled=true                                 
                                                                                
                    
   - Verify no CPU spinning under high spill pressure                           
                                                                                
                                                       
   - Verify proper cleanup when tasks are cancelled/killed               


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to