This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new e0d8192f2 [CELEBORN-2277] Replace synchronized in
Flusher.getWorkerIndex with AtomicInteger
e0d8192f2 is described below
commit e0d8192f2e33d5e9a6c27cf4fd84cdbd3374460e
Author: sychen <[email protected]>
AuthorDate: Mon Mar 30 10:37:43 2026 +0800
[CELEBORN-2277] Replace synchronized in Flusher.getWorkerIndex with
AtomicInteger
### What changes were proposed in this pull request?
Replace the synchronized block in getWorkerIndex with an
AtomicInteger.updateAndGet call using a CAS-based atomic operation.
### Why are the changes needed?
The synchronized keyword locks the entire object and causes thread
contention under high concurrency. Using AtomicInteger reduces lock scope to a
single variable and avoids blocking overhead for this lightweight index
increment operation.
### Does this PR resolve a correctness bug?
No.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GHA.
Closes #3621 from cxzl25/CELEBORN-2277.
Lead-authored-by: sychen <[email protected]>
Co-authored-by: cxzl25 <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 235f07de49339d4cc2d5ac94deb55106a79ca7b4)
Signed-off-by: SteNicholas <[email protected]>
---
.../celeborn/service/deploy/worker/storage/Flusher.scala | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 2a4134e79..645553d4c 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -19,7 +19,8 @@ package org.apache.celeborn.service.deploy.worker.storage
import java.io.IOException
import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, TimeUnit}
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLongArray}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger,
AtomicLongArray}
+import java.util.function.IntUnaryOperator
import scala.util.Random
@@ -48,7 +49,10 @@ abstract private[worker] class Flusher(
protected val workingQueues = new
Array[LinkedBlockingQueue[FlushTask]](threadCount)
protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]()
protected val workers = new Array[ExecutorService](threadCount)
- protected var nextWorkerIndex: Int = 0
+ protected val nextWorkerIndex: AtomicInteger = new AtomicInteger(0)
+ private val workerIndexUpdater: IntUnaryOperator = new IntUnaryOperator {
+ override def applyAsInt(i: Int): Int = (i + 1) % threadCount
+ }
val lastBeginFlushTime: AtomicLongArray = new AtomicLongArray(threadCount)
val stopFlag = new AtomicBoolean(false)
@@ -104,9 +108,8 @@ abstract private[worker] class Flusher(
ThreadPoolSource.registerSource(s"$this", workers)
}
- def getWorkerIndex: Int = synchronized {
- nextWorkerIndex = (nextWorkerIndex + 1) % threadCount
- nextWorkerIndex
+ def getWorkerIndex: Int = {
+ nextWorkerIndex.updateAndGet(workerIndexUpdater)
}
def takeBuffer(): CompositeByteBuf = {