This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2cc899fc3 [ISSUE #5663] Fix Messages may be lost when SyncStateSet
expand in extreme scenarios (#5798)
2cc899fc3 is described below
commit 2cc899fc35ae867477c5e992bfaa435433512745
Author: hzh0425 <[email protected]>
AuthorDate: Mon Mar 13 14:22:29 2023 +0800
[ISSUE #5663] Fix Messages may be lost when SyncStateSet expand in extreme
scenarios (#5798)
* Add a new state 'isSynchronizingSyncStateSet' to Solve the problem of
missing messages
* pass checkstyle
* Using readWriteLock to replace synchronized in AutoSwitchHAService
* Fix lock issue
* Remove unnecessary 'remoteSyncStateSet.clear'
* optimize import
---
.../store/ha/autoswitch/AutoSwitchHAService.java | 141 +++++++++++++++------
.../store/ha/autoswitch/AutoSwitchHATest.java | 58 ++++++---
2 files changed, 144 insertions(+), 55 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 325341c66..04263fc4a 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -17,21 +17,6 @@
package org.apache.rocketmq.store.ha.autoswitch;
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Consumer;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
@@ -50,15 +35,37 @@ import org.apache.rocketmq.store.ha.HAClient;
import org.apache.rocketmq.store.ha.HAConnection;
import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
/**
* SwitchAble ha service, support switch role to master or slave.
*/
public class AutoSwitchHAService extends DefaultHAService {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final ExecutorService executorService =
Executors.newSingleThreadExecutor(new
ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
- private final List<Consumer<Set<String>>> syncStateSetChangedListeners =
new ArrayList<>();
- private final CopyOnWriteArraySet<String> syncStateSet = new
CopyOnWriteArraySet<>();
private final ConcurrentHashMap<String, Long> connectionCaughtUpTimeTable
= new ConcurrentHashMap<>();
+ private final List<Consumer<Set<String>>> syncStateSetChangedListeners =
new ArrayList<>();
+ private final Set<String> syncStateSet = new HashSet<>();
+ private final Set<String> remoteSyncStateSet = new HashSet<>();
+ private final ReadWriteLock syncStateSetReadWriteLock = new
ReentrantReadWriteLock();
+ private final Lock readLock = syncStateSetReadWriteLock.readLock();
+ private final Lock writeLock = syncStateSetReadWriteLock.writeLock();
+
+ // Indicate whether the syncStateSet is currently in the process of being
synchronized to controller.
+ private volatile boolean isSynchronizingSyncStateSet = false;
private volatile long confirmOffset = -1;
private String localAddress;
@@ -66,8 +73,6 @@ public class AutoSwitchHAService extends DefaultHAService {
private EpochFileCache epochCache;
private AutoSwitchHAClient haClient;
- private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-
public AutoSwitchHAService() {
}
@@ -93,10 +98,11 @@ public class AutoSwitchHAService extends DefaultHAService {
@Override
public void removeConnection(HAConnection conn) {
if (!defaultMessageStore.isShutdown()) {
- final Set<String> syncStateSet = getSyncStateSet();
+ final Set<String> syncStateSet = getLocalSyncStateSet();
String slave = ((AutoSwitchHAConnection) conn).getSlaveAddress();
if (syncStateSet.contains(slave)) {
syncStateSet.remove(slave);
+ markSynchronizingSyncStateSet(syncStateSet);
notifySyncStateSetChanged(syncStateSet);
}
}
@@ -232,7 +238,8 @@ public class AutoSwitchHAService extends DefaultHAService {
* A slave will be removed from inSyncStateSet if (curTime -
HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup)
*/
public Set<String> maybeShrinkInSyncStateSet() {
- final Set<String> newSyncStateSet = getSyncStateSet();
+ final Set<String> newSyncStateSet = getLocalSyncStateSet();
+ boolean isSyncStateSetChanged = false;
final long haMaxTimeSlaveNotCatchup =
this.defaultMessageStore.getMessageStoreConfig().getHaMaxTimeSlaveNotCatchup();
for (Map.Entry<String, Long> next :
this.connectionCaughtUpTimeTable.entrySet()) {
final String slaveAddress = next.getKey();
@@ -240,9 +247,13 @@ public class AutoSwitchHAService extends DefaultHAService {
final Long lastCaughtUpTimeMs =
this.connectionCaughtUpTimeTable.get(slaveAddress);
if ((System.currentTimeMillis() - lastCaughtUpTimeMs) >
haMaxTimeSlaveNotCatchup) {
newSyncStateSet.remove(slaveAddress);
+ isSyncStateSetChanged = true;
}
}
}
+ if (isSyncStateSetChanged) {
+ markSynchronizingSyncStateSet(newSyncStateSet);
+ }
return newSyncStateSet;
}
@@ -251,7 +262,7 @@ public class AutoSwitchHAService extends DefaultHAService {
* current confirmOffset, and it is caught up to an offset within the
current leader epoch.
*/
public void maybeExpandInSyncStateSet(final String slaveAddress, final
long slaveMaxOffset) {
- final Set<String> currentSyncStateSet = getSyncStateSet();
+ final Set<String> currentSyncStateSet = getLocalSyncStateSet();
if (currentSyncStateSet.contains(slaveAddress)) {
return;
}
@@ -260,12 +271,33 @@ public class AutoSwitchHAService extends DefaultHAService
{
final EpochEntry currentLeaderEpoch = this.epochCache.lastEntry();
if (slaveMaxOffset >= currentLeaderEpoch.getStartOffset()) {
currentSyncStateSet.add(slaveAddress);
+ markSynchronizingSyncStateSet(currentSyncStateSet);
// Notify the upper layer that syncStateSet changed.
notifySyncStateSetChanged(currentSyncStateSet);
}
}
}
+ private void markSynchronizingSyncStateSet(final Set<String>
newSyncStateSet) {
+ this.writeLock.lock();
+ try {
+ this.isSynchronizingSyncStateSet = true;
+ this.remoteSyncStateSet.clear();
+ this.remoteSyncStateSet.addAll(newSyncStateSet);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private void markSynchronizingSyncStateSetDone() {
+ // No need to lock, because the upper-level calling method has already
locked write lock
+ this.isSynchronizingSyncStateSet = false;
+ }
+
+ public boolean isSynchronizingSyncStateSet() {
+ return isSynchronizingSyncStateSet;
+ }
+
public void updateConnectionLastCaughtUpTime(final String slaveAddress,
final long lastCaughtUpTimeMs) {
Long prevTime =
ConcurrentHashMapUtils.computeIfAbsent(this.connectionCaughtUpTimeTable,
slaveAddress, k -> 0L);
this.connectionCaughtUpTimeTable.put(slaveAddress, Math.max(prevTime,
lastCaughtUpTimeMs));
@@ -276,7 +308,7 @@ public class AutoSwitchHAService extends DefaultHAService {
*/
public long getConfirmOffset() {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()
!= BrokerRole.SLAVE) {
- if (this.syncStateSet.size() == 1) {
+ if (getLocalSyncStateSet().size() == 1) {
return this.defaultMessageStore.getMaxPhyOffset();
}
// First time compute confirmOffset.
@@ -288,19 +320,27 @@ public class AutoSwitchHAService extends DefaultHAService
{
}
public void updateConfirmOffsetWhenSlaveAck(final String slaveAddress) {
- if (this.syncStateSet.contains(slaveAddress)) {
- this.confirmOffset = computeConfirmOffset();
+ this.readLock.lock();
+ try {
+ if (this.syncStateSet.contains(slaveAddress)) {
+ this.confirmOffset = computeConfirmOffset();
+ }
+ } finally {
+ this.readLock.unlock();
}
}
@Override
public int inSyncReplicasNums(final long masterPutWhere) {
- final Lock readLock = readWriteLock.readLock();
+ this.readLock.lock();
try {
- readLock.lock();
- return syncStateSet.size();
+ if (this.isSynchronizingSyncStateSet) {
+ return Math.max(this.syncStateSet.size(),
this.remoteSyncStateSet.size());
+ } else {
+ return this.syncStateSet.size();
+ }
} finally {
- readLock.unlock();
+ this.readLock.unlock();
}
}
@@ -322,6 +362,7 @@ public class AutoSwitchHAService extends DefaultHAService {
info.setMasterCommitLogMaxOffset(masterPutWhere);
+ Set<String> localSyncStateSet = getLocalSyncStateSet();
for (HAConnection conn : this.connectionList) {
HARuntimeInfo.HAConnectionRuntimeInfo cInfo = new
HARuntimeInfo.HAConnectionRuntimeInfo();
@@ -332,11 +373,11 @@ public class AutoSwitchHAService extends DefaultHAService
{
cInfo.setTransferredByteInSecond(conn.getTransferredByteInSecond());
cInfo.setTransferFromWhere(conn.getTransferFromWhere());
-
cInfo.setInSync(syncStateSet.contains(((AutoSwitchHAConnection)
conn).getSlaveAddress()));
+
cInfo.setInSync(localSyncStateSet.contains(((AutoSwitchHAConnection)
conn).getSlaveAddress()));
info.getHaConnectionInfo().add(cInfo);
}
- info.setInSyncSlaveNums(syncStateSet.size() - 1);
+ info.setInSyncSlaveNums(localSyncStateSet.size() - 1);
}
return info;
}
@@ -358,26 +399,46 @@ public class AutoSwitchHAService extends DefaultHAService
{
}
public void setSyncStateSet(final Set<String> syncStateSet) {
- final Lock writeLock = readWriteLock.writeLock();
+ this.writeLock.lock();
try {
- writeLock.lock();
+ markSynchronizingSyncStateSetDone();
this.syncStateSet.clear();
this.syncStateSet.addAll(syncStateSet);
this.confirmOffset = computeConfirmOffset();
} finally {
- writeLock.unlock();
+ this.writeLock.unlock();
}
}
+ /**
+ * Return the union of the local and remote syncStateSets
+ */
public Set<String> getSyncStateSet() {
- final Lock readLock = readWriteLock.readLock();
+ this.readLock.lock();
+ try {
+ if (this.isSynchronizingSyncStateSet) {
+ Set<String> unionSyncStateSet = new
HashSet<>(this.syncStateSet.size() + this.remoteSyncStateSet.size());
+ unionSyncStateSet.addAll(this.syncStateSet);
+ unionSyncStateSet.addAll(this.remoteSyncStateSet);
+ return unionSyncStateSet;
+ } else {
+ HashSet<String> syncStateSet = new
HashSet<>(this.syncStateSet.size());
+ syncStateSet.addAll(this.syncStateSet);
+ return syncStateSet;
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public Set<String> getLocalSyncStateSet() {
+ this.readLock.lock();
try {
- readLock.lock();
- HashSet<String> set = new HashSet<>(this.syncStateSet.size());
- set.addAll(this.syncStateSet);
- return set;
+ HashSet<String> localSyncStateSet = new
HashSet<>(this.syncStateSet.size());
+ localSyncStateSet.addAll(this.syncStateSet);
+ return localSyncStateSet;
} finally {
- readLock.unlock();
+ this.readLock.unlock();
}
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index bc392022d..5fad693f9 100644
---
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -17,21 +17,9 @@
package org.apache.rocketmq.store.ha.autoswitch;
-import java.io.File;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -46,11 +34,25 @@ import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.apache.rocketmq.common.MixAll;
import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;
-import org.junit.Assume;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
@@ -446,6 +448,32 @@ public class AutoSwitchHATest {
checkMessage(messageStore3, 10, 10);
}
+ @Test
+ public void testCheckSynchronizingSyncStateSetFlag() throws Exception {
+ // Step1: broker1 as leader, broker2 as follower
+ init(defaultMappedFileSize);
+ ((AutoSwitchHAService)
this.messageStore1.getHaService()).setSyncStateSet(new
HashSet<>(Collections.singletonList("127.0.0.1:8000")));
+
+ changeMasterAndPutMessage(this.messageStore1, this.storeConfig1,
this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
+ checkMessage(this.messageStore2, 10, 0);
+ AutoSwitchHAService masterHAService = (AutoSwitchHAService)
this.messageStore1.getHaService();
+
+ // Step2: check flag SynchronizingSyncStateSet
+ Assert.assertTrue(masterHAService.isSynchronizingSyncStateSet());
+ Assert.assertEquals(masterHAService.getConfirmOffset(), 1570);
+ Set<String> syncStateSet = masterHAService.getSyncStateSet();
+ Assert.assertEquals(syncStateSet.size(), 2);
+ Assert.assertTrue(syncStateSet.contains("127.0.0.1:8001"));
+
+ // Step3: set new syncStateSet
+ HashSet<String> newSyncStateSet = new HashSet<String>() {{
+ add("127.0.0.1:8000");
+ add("127.0.0.1:8001");
+ }};
+ masterHAService.setSyncStateSet(newSyncStateSet);
+ Assert.assertFalse(masterHAService.isSynchronizingSyncStateSet());
+ }
+
@After
public void destroy() throws Exception {
if (this.messageStore2 != null) {