This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 24c9a52c3 [ISSUE #5809] Replace synchronized with
ReentrantReadWriteLock in AutoSwitchHAService (#5810)
24c9a52c3 is described below
commit 24c9a52c3a058ba2abff0769fb2eb51efa322f68
Author: mxsm <[email protected]>
AuthorDate: Tue Jan 3 15:43:02 2023 +0800
[ISSUE #5809] Replace synchronized with ReentrantReadWriteLock in
AutoSwitchHAService (#5810)
---
.../store/ha/autoswitch/AutoSwitchHAService.java | 43 +++++++++++++++++-----
1 file changed, 33 insertions(+), 10 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index ed694799c..7382587dc 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -28,6 +28,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -63,6 +66,8 @@ public class AutoSwitchHAService extends DefaultHAService {
private EpochFileCache epochCache;
private AutoSwitchHAClient haClient;
+ private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
public AutoSwitchHAService() {
}
@@ -287,8 +292,14 @@ public class AutoSwitchHAService extends DefaultHAService {
}
@Override
- public synchronized int inSyncReplicasNums(final long masterPutWhere) {
- return syncStateSet.size();
+ public int inSyncReplicasNums(final long masterPutWhere) {
+ final Lock readLock = readWriteLock.readLock();
+ try {
+ readLock.lock();
+ return syncStateSet.size();
+ } finally {
+ readLock.unlock();
+ }
}
@Override
@@ -344,16 +355,28 @@ public class AutoSwitchHAService extends DefaultHAService
{
return confirmOffset;
}
- public synchronized void setSyncStateSet(final Set<String> syncStateSet) {
- this.syncStateSet.clear();
- this.syncStateSet.addAll(syncStateSet);
- this.confirmOffset = computeConfirmOffset();
+ public void setSyncStateSet(final Set<String> syncStateSet) {
+ final Lock writeLock = readWriteLock.writeLock();
+ try {
+ writeLock.lock();
+ this.syncStateSet.clear();
+ this.syncStateSet.addAll(syncStateSet);
+ this.confirmOffset = computeConfirmOffset();
+ } finally {
+ writeLock.unlock();
+ }
}
- public synchronized Set<String> getSyncStateSet() {
- HashSet<String> set = new HashSet<>(this.syncStateSet.size());
- set.addAll(this.syncStateSet);
- return set;
+ public Set<String> getSyncStateSet() {
+ final Lock readLock = readWriteLock.readLock();
+ try {
+ readLock.lock();
+ HashSet<String> set = new HashSet<>(this.syncStateSet.size());
+ set.addAll(this.syncStateSet);
+ return set;
+ } finally {
+ readLock.unlock();
+ }
}
public void truncateEpochFilePrefix(final long offset) {