This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2cc899fc3 [ISSUE #5663] Fix Messages may be lost when SyncStateSet 
expand in extreme scenarios (#5798)
2cc899fc3 is described below

commit 2cc899fc35ae867477c5e992bfaa435433512745
Author: hzh0425 <[email protected]>
AuthorDate: Mon Mar 13 14:22:29 2023 +0800

    [ISSUE #5663] Fix Messages may be lost when SyncStateSet expand in extreme 
scenarios (#5798)
    
    * Add a new state 'isSynchronizingSyncStateSet' to Solve the problem of 
missing messages
    
    * pass checkstyle
    
    * Using readWriteLock to replace synchronized in AutoSwitchHAService
    
    * Fix lock issue
    
    * Remove unnecessary 'remoteSyncStateSet.clear'
    
    * optimize import
---
 .../store/ha/autoswitch/AutoSwitchHAService.java   | 141 +++++++++++++++------
 .../store/ha/autoswitch/AutoSwitchHATest.java      |  58 ++++++---
 2 files changed, 144 insertions(+), 55 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 325341c66..04263fc4a 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -17,21 +17,6 @@
 
 package org.apache.rocketmq.store.ha.autoswitch;
 
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Consumer;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
@@ -50,15 +35,37 @@ import org.apache.rocketmq.store.ha.HAClient;
 import org.apache.rocketmq.store.ha.HAConnection;
 import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
 
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
 /**
  * SwitchAble ha service, support switch role to master or slave.
  */
 public class AutoSwitchHAService extends DefaultHAService {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     private final ExecutorService executorService = 
Executors.newSingleThreadExecutor(new 
ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
-    private final List<Consumer<Set<String>>> syncStateSetChangedListeners = 
new ArrayList<>();
-    private final CopyOnWriteArraySet<String> syncStateSet = new 
CopyOnWriteArraySet<>();
     private final ConcurrentHashMap<String, Long> connectionCaughtUpTimeTable 
= new ConcurrentHashMap<>();
+    private final List<Consumer<Set<String>>> syncStateSetChangedListeners = 
new ArrayList<>();
+    private final Set<String> syncStateSet = new HashSet<>();
+    private final Set<String> remoteSyncStateSet = new HashSet<>();
+    private final ReadWriteLock syncStateSetReadWriteLock = new 
ReentrantReadWriteLock();
+    private final Lock readLock = syncStateSetReadWriteLock.readLock();
+    private final Lock writeLock = syncStateSetReadWriteLock.writeLock();
+
+    //  Indicate whether the syncStateSet is currently in the process of being 
synchronized to controller.
+    private volatile boolean isSynchronizingSyncStateSet = false;
     private volatile long confirmOffset = -1;
 
     private String localAddress;
@@ -66,8 +73,6 @@ public class AutoSwitchHAService extends DefaultHAService {
     private EpochFileCache epochCache;
     private AutoSwitchHAClient haClient;
 
-    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-
     public AutoSwitchHAService() {
     }
 
@@ -93,10 +98,11 @@ public class AutoSwitchHAService extends DefaultHAService {
     @Override
     public void removeConnection(HAConnection conn) {
         if (!defaultMessageStore.isShutdown()) {
-            final Set<String> syncStateSet = getSyncStateSet();
+            final Set<String> syncStateSet = getLocalSyncStateSet();
             String slave = ((AutoSwitchHAConnection) conn).getSlaveAddress();
             if (syncStateSet.contains(slave)) {
                 syncStateSet.remove(slave);
+                markSynchronizingSyncStateSet(syncStateSet);
                 notifySyncStateSetChanged(syncStateSet);
             }
         }
@@ -232,7 +238,8 @@ public class AutoSwitchHAService extends DefaultHAService {
      * A slave will be removed from inSyncStateSet if (curTime - 
HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup)
      */
     public Set<String> maybeShrinkInSyncStateSet() {
-        final Set<String> newSyncStateSet = getSyncStateSet();
+        final Set<String> newSyncStateSet = getLocalSyncStateSet();
+        boolean isSyncStateSetChanged = false;
         final long haMaxTimeSlaveNotCatchup = 
this.defaultMessageStore.getMessageStoreConfig().getHaMaxTimeSlaveNotCatchup();
         for (Map.Entry<String, Long> next : 
this.connectionCaughtUpTimeTable.entrySet()) {
             final String slaveAddress = next.getKey();
@@ -240,9 +247,13 @@ public class AutoSwitchHAService extends DefaultHAService {
                 final Long lastCaughtUpTimeMs = 
this.connectionCaughtUpTimeTable.get(slaveAddress);
                 if ((System.currentTimeMillis() - lastCaughtUpTimeMs) > 
haMaxTimeSlaveNotCatchup) {
                     newSyncStateSet.remove(slaveAddress);
+                    isSyncStateSetChanged = true;
                 }
             }
         }
+        if (isSyncStateSetChanged) {
+            markSynchronizingSyncStateSet(newSyncStateSet);
+        }
         return newSyncStateSet;
     }
 
@@ -251,7 +262,7 @@ public class AutoSwitchHAService extends DefaultHAService {
      * current confirmOffset, and it is caught up to an offset within the 
current leader epoch.
      */
     public void maybeExpandInSyncStateSet(final String slaveAddress, final 
long slaveMaxOffset) {
-        final Set<String> currentSyncStateSet = getSyncStateSet();
+        final Set<String> currentSyncStateSet = getLocalSyncStateSet();
         if (currentSyncStateSet.contains(slaveAddress)) {
             return;
         }
@@ -260,12 +271,33 @@ public class AutoSwitchHAService extends DefaultHAService 
{
             final EpochEntry currentLeaderEpoch = this.epochCache.lastEntry();
             if (slaveMaxOffset >= currentLeaderEpoch.getStartOffset()) {
                 currentSyncStateSet.add(slaveAddress);
+                markSynchronizingSyncStateSet(currentSyncStateSet);
                 // Notify the upper layer that syncStateSet changed.
                 notifySyncStateSetChanged(currentSyncStateSet);
             }
         }
     }
 
+    private void markSynchronizingSyncStateSet(final Set<String> 
newSyncStateSet) {
+        this.writeLock.lock();
+        try {
+            this.isSynchronizingSyncStateSet = true;
+            this.remoteSyncStateSet.clear();
+            this.remoteSyncStateSet.addAll(newSyncStateSet);
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private void markSynchronizingSyncStateSetDone() {
+        // No need to lock, because the upper-level calling method has already 
locked write lock
+        this.isSynchronizingSyncStateSet = false;
+    }
+
+    public boolean isSynchronizingSyncStateSet() {
+        return isSynchronizingSyncStateSet;
+    }
+
     public void updateConnectionLastCaughtUpTime(final String slaveAddress, 
final long lastCaughtUpTimeMs) {
         Long prevTime = 
ConcurrentHashMapUtils.computeIfAbsent(this.connectionCaughtUpTimeTable, 
slaveAddress, k -> 0L);
         this.connectionCaughtUpTimeTable.put(slaveAddress, Math.max(prevTime, 
lastCaughtUpTimeMs));
@@ -276,7 +308,7 @@ public class AutoSwitchHAService extends DefaultHAService {
      */
     public long getConfirmOffset() {
         if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() 
!= BrokerRole.SLAVE) {
-            if (this.syncStateSet.size() == 1) {
+            if (getLocalSyncStateSet().size() == 1) {
                 return this.defaultMessageStore.getMaxPhyOffset();
             }
             // First time compute confirmOffset.
@@ -288,19 +320,27 @@ public class AutoSwitchHAService extends DefaultHAService 
{
     }
 
     public void updateConfirmOffsetWhenSlaveAck(final String slaveAddress) {
-        if (this.syncStateSet.contains(slaveAddress)) {
-            this.confirmOffset = computeConfirmOffset();
+        this.readLock.lock();
+        try {
+            if (this.syncStateSet.contains(slaveAddress)) {
+                this.confirmOffset = computeConfirmOffset();
+            }
+        } finally {
+            this.readLock.unlock();
         }
     }
 
     @Override
     public int inSyncReplicasNums(final long masterPutWhere) {
-        final Lock readLock = readWriteLock.readLock();
+        this.readLock.lock();
         try {
-            readLock.lock();
-            return syncStateSet.size();
+            if (this.isSynchronizingSyncStateSet) {
+                return Math.max(this.syncStateSet.size(), 
this.remoteSyncStateSet.size());
+            } else {
+                return this.syncStateSet.size();
+            }
         } finally {
-            readLock.unlock();
+            this.readLock.unlock();
         }
     }
 
@@ -322,6 +362,7 @@ public class AutoSwitchHAService extends DefaultHAService {
 
             info.setMasterCommitLogMaxOffset(masterPutWhere);
 
+            Set<String> localSyncStateSet = getLocalSyncStateSet();
             for (HAConnection conn : this.connectionList) {
                 HARuntimeInfo.HAConnectionRuntimeInfo cInfo = new 
HARuntimeInfo.HAConnectionRuntimeInfo();
 
@@ -332,11 +373,11 @@ public class AutoSwitchHAService extends DefaultHAService 
{
                 
cInfo.setTransferredByteInSecond(conn.getTransferredByteInSecond());
                 cInfo.setTransferFromWhere(conn.getTransferFromWhere());
 
-                
cInfo.setInSync(syncStateSet.contains(((AutoSwitchHAConnection) 
conn).getSlaveAddress()));
+                
cInfo.setInSync(localSyncStateSet.contains(((AutoSwitchHAConnection) 
conn).getSlaveAddress()));
 
                 info.getHaConnectionInfo().add(cInfo);
             }
-            info.setInSyncSlaveNums(syncStateSet.size() - 1);
+            info.setInSyncSlaveNums(localSyncStateSet.size() - 1);
         }
         return info;
     }
@@ -358,26 +399,46 @@ public class AutoSwitchHAService extends DefaultHAService 
{
     }
 
     public void setSyncStateSet(final Set<String> syncStateSet) {
-        final Lock writeLock = readWriteLock.writeLock();
+        this.writeLock.lock();
         try {
-            writeLock.lock();
+            markSynchronizingSyncStateSetDone();
             this.syncStateSet.clear();
             this.syncStateSet.addAll(syncStateSet);
             this.confirmOffset = computeConfirmOffset();
         } finally {
-            writeLock.unlock();
+            this.writeLock.unlock();
         }
     }
 
+    /**
+     * Return the union of the local and remote syncStateSets
+     */
     public Set<String> getSyncStateSet() {
-        final Lock readLock = readWriteLock.readLock();
+        this.readLock.lock();
+        try {
+            if (this.isSynchronizingSyncStateSet) {
+                Set<String> unionSyncStateSet = new 
HashSet<>(this.syncStateSet.size() + this.remoteSyncStateSet.size());
+                unionSyncStateSet.addAll(this.syncStateSet);
+                unionSyncStateSet.addAll(this.remoteSyncStateSet);
+                return unionSyncStateSet;
+            } else {
+                HashSet<String> syncStateSet = new 
HashSet<>(this.syncStateSet.size());
+                syncStateSet.addAll(this.syncStateSet);
+                return syncStateSet;
+            }
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public Set<String> getLocalSyncStateSet() {
+        this.readLock.lock();
         try {
-            readLock.lock();
-            HashSet<String> set = new HashSet<>(this.syncStateSet.size());
-            set.addAll(this.syncStateSet);
-            return set;
+            HashSet<String> localSyncStateSet = new 
HashSet<>(this.syncStateSet.size());
+            localSyncStateSet.addAll(this.syncStateSet);
+            return localSyncStateSet;
         } finally {
-            readLock.unlock();
+            this.readLock.unlock();
         }
     }
 
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index bc392022d..5fad693f9 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -17,21 +17,9 @@
 
 package org.apache.rocketmq.store.ha.autoswitch;
 
-import java.io.File;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -46,11 +34,25 @@ import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.logfile.MappedFile;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.apache.rocketmq.common.MixAll;
 import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.Assume;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
@@ -446,6 +448,32 @@ public class AutoSwitchHATest {
         checkMessage(messageStore3, 10, 10);
     }
 
+    @Test
+    public void testCheckSynchronizingSyncStateSetFlag() throws Exception {
+        // Step1: broker1 as leader, broker2 as follower
+        init(defaultMappedFileSize);
+        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(new 
HashSet<>(Collections.singletonList("127.0.0.1:8000")));
+
+        changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, 
this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
+        checkMessage(this.messageStore2, 10, 0);
+        AutoSwitchHAService masterHAService = (AutoSwitchHAService) 
this.messageStore1.getHaService();
+
+        // Step2: check flag SynchronizingSyncStateSet
+        Assert.assertTrue(masterHAService.isSynchronizingSyncStateSet());
+        Assert.assertEquals(masterHAService.getConfirmOffset(), 1570);
+        Set<String> syncStateSet = masterHAService.getSyncStateSet();
+        Assert.assertEquals(syncStateSet.size(), 2);
+        Assert.assertTrue(syncStateSet.contains("127.0.0.1:8001"));
+
+        // Step3: set new syncStateSet
+        HashSet<String> newSyncStateSet = new HashSet<String>() {{
+                add("127.0.0.1:8000");
+                add("127.0.0.1:8001");
+                }};
+        masterHAService.setSyncStateSet(newSyncStateSet);
+        Assert.assertFalse(masterHAService.isSynchronizingSyncStateSet());
+    }
+
     @After
     public void destroy() throws Exception {
         if (this.messageStore2 != null) {

Reply via email to