andygrove opened a new pull request, #3195: URL: https://github.com/apache/datafusion-comet/pull/3195
This PR addresses several concurrency and resource management issues in the async shuffle external sorter that could cause CPU waste, memory leaks, and improper cleanup during error conditions. Builds on https://github.com/apache/datafusion-comet/pull/3192 ## Changes 1. Replace busy-wait loop with proper blocking Problem: When the maximum number of concurrent spill tasks was reached, the code used a CPU-burning spin loop to check if any task completed. Before: while (asyncSpillTasks.size() == threadNum) { for (Future<Void> spillingTask : asyncSpillTasks) { if (spillingTask.isDone()) { asyncSpillTasks.remove(spillingTask); break; } } } After: Block on the oldest task using Future.get(), which properly yields the CPU while waiting. 2. Fix exception handling in closeAndGetSpills() Problem: If one async task threw an exception, subsequent tasks were not awaited, potentially leaving background threads running and resources unreleased. After: Wait for ALL tasks to complete, collecting exceptions using addSuppressed() to preserve error information from multiple failures. 3. Add runtime validation for thread pool initialization Problem: Used assert for validating threadNum > 0 (disabled in production) and no null check for thread pool. After: Proper runtime checks with descriptive error messages: - IllegalArgumentException if threadNum <= 0 - IllegalStateException if thread pool is null 4. Fix memory leak on exception in async spill task Problem: If writeSortedFileNative() threw an exception, the SpillSorter remained in the spillingSorters queue with its memory unreleased. After: Wrap in try-finally to ensure freeMemory(), freeArray(), and spillingSorters.remove() always execute. 5. Add cancellation support in cleanupResources() Problem: When a task was killed or aborted, background spill threads continued running with no way to stop them. After: Cancel all pending async tasks and wait briefly for their cleanup to complete before freeing memory and deleting spill files. 6. Make peakMemoryUsedBytes volatile Problem: Field could have stale reads when accessed from different threads (main thread vs background spill threads). After: Added volatile modifier to ensure visibility across threads. ## Test Plan - Existing shuffle tests pass - Async shuffle tests pass with spark.comet.columnar.shuffle.async.enabled=true - Verify no CPU spinning under high spill pressure - Verify proper cleanup when tasks are cancelled/killed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
