This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new e0d8192f2 [CELEBORN-2277] Replace synchronized in 
Flusher.getWorkerIndex with AtomicInteger
e0d8192f2 is described below

commit e0d8192f2e33d5e9a6c27cf4fd84cdbd3374460e
Author: sychen <[email protected]>
AuthorDate: Mon Mar 30 10:37:43 2026 +0800

    [CELEBORN-2277] Replace synchronized in Flusher.getWorkerIndex with 
AtomicInteger
    
    ### What changes were proposed in this pull request?
    
    Replace the synchronized block in getWorkerIndex with an 
AtomicInteger.updateAndGet call using a CAS-based atomic operation.
    
    ### Why are the changes needed?
    
    The synchronized keyword locks the entire object and causes thread 
contention under high concurrency. Using AtomicInteger reduces lock scope to a 
single variable and avoids blocking overhead for this lightweight index 
increment operation.
    
    ### Does this PR resolve a correctness bug?
    No.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    GHA.
    
    Closes #3621 from cxzl25/CELEBORN-2277.
    
    Lead-authored-by: sychen <[email protected]>
    Co-authored-by: cxzl25 <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit 235f07de49339d4cc2d5ac94deb55106a79ca7b4)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../celeborn/service/deploy/worker/storage/Flusher.scala    | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 2a4134e79..645553d4c 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -19,7 +19,8 @@ package org.apache.celeborn.service.deploy.worker.storage
 
 import java.io.IOException
 import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, TimeUnit}
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLongArray}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, 
AtomicLongArray}
+import java.util.function.IntUnaryOperator
 
 import scala.util.Random
 
@@ -48,7 +49,10 @@ abstract private[worker] class Flusher(
   protected val workingQueues = new 
Array[LinkedBlockingQueue[FlushTask]](threadCount)
   protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]()
   protected val workers = new Array[ExecutorService](threadCount)
-  protected var nextWorkerIndex: Int = 0
+  protected val nextWorkerIndex: AtomicInteger = new AtomicInteger(0)
+  private val workerIndexUpdater: IntUnaryOperator = new IntUnaryOperator {
+    override def applyAsInt(i: Int): Int = (i + 1) % threadCount
+  }
 
   val lastBeginFlushTime: AtomicLongArray = new AtomicLongArray(threadCount)
   val stopFlag = new AtomicBoolean(false)
@@ -104,9 +108,8 @@ abstract private[worker] class Flusher(
     ThreadPoolSource.registerSource(s"$this", workers)
   }
 
-  def getWorkerIndex: Int = synchronized {
-    nextWorkerIndex = (nextWorkerIndex + 1) % threadCount
-    nextWorkerIndex
+  def getWorkerIndex: Int = {
+    nextWorkerIndex.updateAndGet(workerIndexUpdater)
   }
 
   def takeBuffer(): CompositeByteBuf = {

Reply via email to