This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new af92a5f914d [fix][broker] Fix concurrency bug in
BucketDelayedDeliveryTracker (#25346)
af92a5f914d is described below
commit af92a5f914db95b1a89e31198d4cd2b2cc49eab8
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Mar 18 06:55:26 2026 -0700
[fix][broker] Fix concurrency bug in BucketDelayedDeliveryTracker (#25346)
---
.../bucket/BucketDelayedDeliveryTracker.java | 42 ++--------------------
1 file changed, 2 insertions(+), 40 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 3f0fcc51657..ff077e57340 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -43,7 +43,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.StampedLock;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import lombok.Getter;
@@ -94,8 +93,6 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
private final AtomicLong numberDelayedMessages = new AtomicLong(0);
- // Thread safety locks
- private final StampedLock stampedLock = new StampedLock();
@Getter
@VisibleForTesting
@@ -577,24 +574,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
}
@Override
- protected long nextDeliveryTime() {
- // Use optimistic read for frequently called method
- long stamp = stampedLock.tryOptimisticRead();
- long result = nextDeliveryTimeUnsafe();
-
-
- if (!stampedLock.validate(stamp)) {
- stamp = stampedLock.readLock();
- try {
- result = nextDeliveryTimeUnsafe();
- } finally {
- stampedLock.unlockRead(stamp);
- }
- }
- return result;
- }
-
- private long nextDeliveryTimeUnsafe() {
+ protected synchronized long nextDeliveryTime() {
if (lastMutableBucket.isEmpty() &&
!sharedBucketPriorityQueue.isEmpty()) {
return sharedBucketPriorityQueue.peekN1();
} else if (sharedBucketPriorityQueue.isEmpty() &&
!lastMutableBucket.isEmpty()) {
@@ -788,25 +768,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
.orElse(false);
}
- public boolean containsMessage(long ledgerId, long entryId) {
- // Try optimistic read first for best performance
- long stamp = stampedLock.tryOptimisticRead();
- boolean result = containsMessageUnsafe(ledgerId, entryId);
-
-
- if (!stampedLock.validate(stamp)) {
- // Fall back to read lock if validation fails
- stamp = stampedLock.readLock();
- try {
- result = containsMessageUnsafe(ledgerId, entryId);
- } finally {
- stampedLock.unlockRead(stamp);
- }
- }
- return result;
- }
-
- private boolean containsMessageUnsafe(long ledgerId, long entryId) {
+ public synchronized boolean containsMessage(long ledgerId, long entryId) {
if (lastMutableBucket.containsMessage(ledgerId, entryId)) {
return true;
}