This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c936abd2a3 [fix](fe) when bdbje adding follower, master write op may 
failed. (#10376)
c936abd2a3 is described below

commit c936abd2a37b2dd4213c0240b32df98ddd4b4017
Author: Lei Zhang <1091517...@qq.com>
AuthorDate: Wed Jul 6 10:29:16 2022 +0800

    [fix](fe) when bdbje adding follower, master write op may failed. (#10376)
---
 .../java/org/apache/doris/catalog/Catalog.java     | 13 ++++
 .../src/main/java/org/apache/doris/ha/BDBHA.java   | 71 ++++++++++++++++++----
 .../apache/doris/journal/bdbje/BDBJEJournal.java   | 39 ++++++++----
 .../java/org/apache/doris/persist/EditLog.java     | 39 +++++++-----
 .../java/org/apache/doris/system/Frontend.java     | 29 +++++----
 .../java/org/apache/doris/system/HeartbeatMgr.java |  2 +-
 6 files changed, 140 insertions(+), 53 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 2798b2b36e..4020ffa72f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -2470,6 +2470,7 @@ public class Catalog {
             if (role == FrontendNodeType.FOLLOWER || role == 
FrontendNodeType.REPLICA) {
                 bdbha.addHelperSocket(host, editLogPort);
                 helperNodes.add(Pair.create(host, editLogPort));
+                bdbha.addUnReadyElectableNode(nodeName, getFollowerCount());
             }
             bdbha.removeConflictNodeIfExist(host, editLogPort);
             editLog.logAddFrontend(fe);
@@ -2499,6 +2500,8 @@ public class Catalog {
             if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == 
FrontendNodeType.REPLICA) {
                 haProtocol.removeElectableNode(fe.getNodeName());
                 helperNodes.remove(Pair.create(host, port));
+                BDBHA ha = (BDBHA) haProtocol;
+                ha.removeUnReadyElectableNode(nodeName, getFollowerCount());
             }
             editLog.logRemoveFrontend(fe);
         } finally {
@@ -4931,4 +4934,14 @@ public class Catalog {
             sb.append("\nCOMMENT 
'").append(table.getComment(true)).append("'");
         }
     }
+
+    public int getFollowerCount() {
+        int count = 0;
+        for (Frontend fe : frontends.values()) {
+            if (fe.getRole() == FrontendNodeType.FOLLOWER) {
+                count++;
+            }
+        }
+        return count;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java 
b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java
index 22d15373d8..02f110b40a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java
@@ -1,17 +1,17 @@
 // Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
+// or more contributor license agreements. See the NOTICE file
 // distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
+// regarding copyright ownership. The ASF licenses this file
 // to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
+// with the License. You may obtain a copy of the License at
 //
-//   http://www.apache.org/licenses/LICENSE-2.0
+// http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
+// KIND, either express or implied. See the License for the
 // specific language governing permissions and limitations
 // under the License.
 
@@ -28,7 +28,9 @@ import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.OperationStatus;
 import com.sleepycat.je.rep.MasterStateException;
 import com.sleepycat.je.rep.MemberNotFoundException;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
 import com.sleepycat.je.rep.ReplicationGroup;
+import com.sleepycat.je.rep.ReplicationMutableConfig;
 import com.sleepycat.je.rep.ReplicationNode;
 import com.sleepycat.je.rep.UnknownMasterException;
 import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
@@ -37,16 +39,30 @@ import org.apache.logging.log4j.Logger;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 public class BDBHA implements HAProtocol {
     private static final Logger LOG = LogManager.getLogger(BDBHA.class);
 
-    private BDBEnvironment environment;
-    private String nodeName;
+    private final BDBEnvironment environment;
+    private final String nodeName;
     private static final int RETRY_TIME = 3;
 
+    // Unstable node is a follower node that is joining the cluster but have 
not
+    // completed.
+    // We should record this kind node and set the bdb electable group size to
+    // (size_of_all_followers - size_of_unstable_nodes).
+    // Because once the handshake is successful, the joined node is put into 
the
+    // optional group,
+    // but it may take a little time for this node to replicate the historical 
data.
+    // This node will never respond to a new data replication until the 
historical
+    // replication is completed,
+    // and if the master cannot receive a quorum response, the write operation 
will
+    // fail.
+    private final Set<String> unReadyElectableNodes = new HashSet<>();
+
     public BDBHA(BDBEnvironment env, String nodeName) {
         this.environment = env;
         this.nodeName = nodeName;
@@ -124,7 +140,8 @@ public class BDBHA implements HAProtocol {
                 if (leaderIncluded) {
                     ret.add(replicationNode.getSocketAddress());
                 } else {
-                    if 
(!replicationNode.getName().equals(replicationGroupAdmin.getMasterNodeName())) {
+                    if (!replicationNode.getName()
+                            
.equals(replicationGroupAdmin.getMasterNodeName())) {
                         ret.add(replicationNode.getSocketAddress());
                     }
                 }
@@ -204,17 +221,20 @@ public class BDBHA implements HAProtocol {
         return true;
     }
 
-    // When new Follower FE is added to the cluster, it should also be added 
to the helper sockets in
+    // When new Follower FE is added to the cluster, it should also be added 
to the
+    // helper sockets in
     // ReplicationGroupAdmin, in order to fix the following case:
     // 1. A Observer starts with helper of master FE.
     // 2. Master FE is dead, new Master is elected.
     // 3. Observer's helper sockets only contains the info of the dead master 
FE.
-    //    So when you try to get frontends' info from this Observer, it will 
throw the Exception:
-    //    "Could not determine master from helpers at:[/dead master FE 
host:port]"
+    // So when you try to get frontends' info from this Observer, it will 
throw the
+    // Exception:
+    // "Could not determine master from helpers at:[/dead master FE host:port]"
     public void addHelperSocket(String ip, Integer port) {
         ReplicationGroupAdmin replicationGroupAdmin = 
environment.getReplicationGroupAdmin();
-        Set<InetSocketAddress> helperSockets = 
Sets.newHashSet(replicationGroupAdmin.getHelperSockets());
-        InetSocketAddress newHelperSocket =  new InetSocketAddress(ip, port);
+        Set<InetSocketAddress> helperSockets =
+                Sets.newHashSet(replicationGroupAdmin.getHelperSockets());
+        InetSocketAddress newHelperSocket = new InetSocketAddress(ip, port);
         if (!helperSockets.contains(newHelperSocket)) {
             helperSockets.add(newHelperSocket);
             environment.setNewReplicationGroupAdmin(helperSockets);
@@ -240,4 +260,29 @@ public class BDBHA implements HAProtocol {
             removeElectableNode(conflictNode);
         }
     }
+
+    public synchronized void addUnReadyElectableNode(String nodeName, int 
totalFollowerCount) {
+        unReadyElectableNodes.add(nodeName);
+        ReplicatedEnvironment replicatedEnvironment = 
environment.getReplicatedEnvironment();
+        if (replicatedEnvironment != null) {
+            replicatedEnvironment.setRepMutableConfig(new 
ReplicationMutableConfig()
+                    .setElectableGroupSizeOverride(totalFollowerCount - 
unReadyElectableNodes.size()));
+        }
+    }
+
+    public synchronized void removeUnReadyElectableNode(String nodeName, int 
totalFollowerCount) {
+        unReadyElectableNodes.remove(nodeName);
+        ReplicatedEnvironment replicatedEnvironment = 
environment.getReplicatedEnvironment();
+        if (replicatedEnvironment != null) {
+            if (unReadyElectableNodes.isEmpty()) {
+                // Setting ElectableGroupSizeOverride to 0 means remove this 
config,
+                // and bdb will use the normal electable group size.
+                replicatedEnvironment.setRepMutableConfig(
+                        new 
ReplicationMutableConfig().setElectableGroupSizeOverride(0));
+            } else {
+                replicatedEnvironment.setRepMutableConfig(new 
ReplicationMutableConfig()
+                        .setElectableGroupSizeOverride(totalFollowerCount - 
unReadyElectableNodes.size()));
+            }
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 4d787f866b..4974ec0f85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -163,7 +163,8 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
             if (op == OperationType.OP_TIMESTAMP) {
                 /*
                  * Do not exit if the write operation is OP_TIMESTAMP.
-                 * If all the followers exit except master, master should 
continue provide query service.
+                 * If all the followers exit except master, master should 
continue provide query
+                 * service.
                  * To prevent master exit, we should exempt OP_TIMESTAMP write
                  */
                 nextJournalId.set(id);
@@ -307,7 +308,8 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
             }
         }
 
-        // Open a new journal database or get last existing one as current 
journal database
+        // Open a new journal database or get last existing one as current 
journal
+        // database
         List<Long> dbNames = null;
         for (int i = 0; i < RETRY_TIME; i++) {
             try {
@@ -319,10 +321,11 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
                 }
                 if (dbNames.size() == 0) {
                     /*
-                     *  This is the very first time to open. Usually, we will 
open a new database named "1".
-                     *  But when we start cluster with an image file copied 
from other cluster,
-                     *  here we should open database with name image max 
journal id + 1.
-                     *  (default 
Catalog.getServingCatalog().getReplayedJournalId() is 0)
+                     * This is the very first time to open. Usually, we will 
open a new database
+                     * named "1".
+                     * But when we start cluster with an image file copied 
from other cluster,
+                     * here we should open database with name image max 
journal id + 1.
+                     * (default 
Catalog.getServingCatalog().getReplayedJournalId() is 0)
                      */
                     String dbName = 
Long.toString(Catalog.getServingCatalog().getReplayedJournalId() + 1);
                     LOG.info("the very first time to open bdb, dbname is {}", 
dbName);
@@ -344,8 +347,10 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
 
     private void reSetupBdbEnvironment(InsufficientLogException 
insufficientLogEx) {
         LOG.warn("catch insufficient log exception. will recover and try 
again.", insufficientLogEx);
-        // Copy the missing log files from a member of the replication group 
who owns the files
-        // ATTN: here we use `getServingCatalog()`, because only serving 
catalog has helper nodes.
+        // Copy the missing log files from a member of the replication group 
who owns
+        // the files
+        // ATTN: here we use `getServingCatalog()`, because only serving 
catalog has
+        // helper nodes.
         Pair<String, Integer> helperNode = 
Catalog.getServingCatalog().getHelperNode();
         NetworkRestore restore = new NetworkRestore();
         NetworkRestoreConfig config = new NetworkRestoreConfig();
@@ -413,19 +418,23 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
             return null;
         }
 
-        // Open a new journal database or get last existing one as current 
journal database
-        List<Long>  dbNames = null;
+        // Open a new journal database or get last existing one as current 
journal
+        // database
+        List<Long> dbNames = null;
         for (int i = 0; i < RETRY_TIME; i++) {
             try {
                 dbNames = bdbEnvironment.getDatabaseNames();
                 break;
             } catch (InsufficientLogException insufficientLogEx) {
                 /*
-                 * If this is not a checkpoint thread, which means this maybe 
the FE startup thread,
-                 * or a replay thread. We will reopen bdbEnvironment for these 
2 cases to get valid log
+                 * If this is not a checkpoint thread, which means this maybe 
the FE startup
+                 * thread,
+                 * or a replay thread. We will reopen bdbEnvironment for these 
2 cases to get
+                 * valid log
                  * from helper nodes.
                  *
-                 * The checkpoint thread will only run on Master FE. And 
Master FE should not encounter
+                 * The checkpoint thread will only run on Master FE. And 
Master FE should not
+                 * encounter
                  * these exception. So if it happens, throw exception out.
                  */
                 if (!Catalog.isCheckpointThread()) {
@@ -446,4 +455,8 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
 
         return dbNames;
     }
+
+    public BDBEnvironment getBDBEnvironment() {
+        return this.bdbEnvironment;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 0871855d20..3e0b531970 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -466,7 +466,7 @@ public class EditLog {
                     int version = Integer.parseInt(versionString);
                     if (version > FeConstants.meta_version) {
                         LOG.error("meta data version is out of date, image: 
{}. meta: {}."
-                                        + "please update 
FeConstants.meta_version and restart.",
+                                + "please update FeConstants.meta_version and 
restart.",
                                 MetaContext.get().getMetaVersion(), 
FeConstants.meta_version);
                         System.exit(-1);
                     }
@@ -546,8 +546,8 @@ public class EditLog {
                     break;
                 }
                 case OperationType.OP_BATCH_REMOVE_TXNS: {
-                    final BatchRemoveTransactionsOperation operation
-                            = (BatchRemoveTransactionsOperation) 
journal.getData();
+                    final BatchRemoveTransactionsOperation operation = 
(BatchRemoveTransactionsOperation) journal
+                            .getData();
                     
Catalog.getCurrentGlobalTransactionMgr().replayBatchRemoveTransactions(operation);
                     break;
                 }
@@ -647,8 +647,8 @@ public class EditLog {
                     break;
                 }
                 case OperationType.OP_CREATE_LOAD_JOB: {
-                    org.apache.doris.load.loadv2.LoadJob loadJob =
-                            (org.apache.doris.load.loadv2.LoadJob) 
journal.getData();
+                    org.apache.doris.load.loadv2.LoadJob loadJob = 
(org.apache.doris.load.loadv2.LoadJob) journal
+                            .getData();
                     catalog.getLoadManager().replayCreateLoadJob(loadJob);
                     break;
                 }
@@ -742,8 +742,8 @@ public class EditLog {
                     break;
                 }
                 case OperationType.OP_REPLACE_TEMP_PARTITION: {
-                    ReplacePartitionOperationLog replaceTempPartitionLog
-                            = (ReplacePartitionOperationLog) journal.getData();
+                    ReplacePartitionOperationLog replaceTempPartitionLog = 
(ReplacePartitionOperationLog) journal
+                            .getData();
                     
catalog.replayReplaceTempPartition(replaceTempPartitionLog);
                     break;
                 }
@@ -858,13 +858,19 @@ public class EditLog {
              * for a table that no longer exists.
              * 1. Thread 1: get TableA object
              * 2. Thread 2: lock db and drop table and record edit log of the 
dropped TableA
-             * 3. Thread 1: lock table, modify table and record edit log of 
the modified TableA
+             * 3. Thread 1: lock table, modify table and record edit log of 
the modified
+             * TableA
              * **The modified edit log is after the dropped edit log**
-             * Because the table has been dropped, the olapTable in here is 
null when the modified edit log is replayed.
-             * So in this case, we will ignore the edit log of the modified 
table after the table is dropped.
-             * This could make the meta inconsistent, for example, an edit log 
on a dropped table is ignored, but
-             * this table is restored later, so there may be an inconsistent 
situation between master and followers. We
-             * log a warning here to debug when happens. This could happen to 
other meta like DB.
+             * Because the table has been dropped, the olapTable in here is 
null when the
+             * modified edit log is replayed.
+             * So in this case, we will ignore the edit log of the modified 
table after the
+             * table is dropped.
+             * This could make the meta inconsistent, for example, an edit log 
on a dropped
+             * table is ignored, but
+             * this table is restored later, so there may be an inconsistent 
situation
+             * between master and followers. We
+             * log a warning here to debug when happens. This could happen to 
other meta
+             * like DB.
              */
             LOG.warn("[INCONSISTENT META] replay failed {}: {}", journal, 
e.getMessage(), e);
         } catch (Exception e) {
@@ -911,7 +917,8 @@ public class EditLog {
         try {
             journal.write(op, writable);
         } catch (Throwable t) {
-            // Throwable contains all Exception and Error, such as IOException 
and OutOfMemoryError
+            // Throwable contains all Exception and Error, such as IOException 
and
+            // OutOfMemoryError
             LOG.error("Fatal Error : write stream Exception", t);
             System.exit(-1);
         }
@@ -1461,4 +1468,8 @@ public class EditLog {
     public void logDatasourceLog(short id, CatalogLog log) {
         logEdit(id, log);
     }
+
+    public Journal getJournal() {
+        return this.journal;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
index bcad0012c4..1dd498beb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
@@ -1,24 +1,26 @@
 // Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
+// or more contributor license agreements. See the NOTICE file
 // distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
+// regarding copyright ownership. The ASF licenses this file
 // to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
+// with the License. You may obtain a copy of the License at
 //
-//   http://www.apache.org/licenses/LICENSE-2.0
+// http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
+// KIND, either express or implied. See the License for the
 // specific language governing permissions and limitations
 // under the License.
 
 package org.apache.doris.system;
 
+import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.ha.BDBHA;
 import org.apache.doris.ha.FrontendNodeType;
 import org.apache.doris.system.HeartbeatResponse.HbStatus;
 
@@ -42,8 +44,7 @@ public class Frontend implements Writable {
 
     private boolean isAlive = false;
 
-    public Frontend() {
-    }
+    public Frontend() {}
 
     public Frontend(FrontendNodeType role, String nodeName, String host, int 
editLogPort) {
         this.role = role;
@@ -97,14 +98,18 @@ public class Frontend implements Writable {
     }
 
     /**
-     * handle Frontend's heartbeat response.
-     * Because the replayed journal id is very likely to be changed at each 
heartbeat response,
-     * so we simple return true if the heartbeat status is OK.
-     * But if heartbeat status is BAD, only return true if it is the first 
time to transfer from alive to dead.
+     * handle Frontend's heartbeat response. Because the replayed journal id 
is very likely to be
+     * changed at each heartbeat response, so we simple return true if the 
heartbeat status is OK.
+     * But if heartbeat status is BAD, only return true if it is the first 
time to transfer from
+     * alive to dead.
      */
-    public boolean handleHbResponse(FrontendHbResponse hbResponse) {
+    public boolean handleHbResponse(FrontendHbResponse hbResponse, boolean 
isReplay) {
         boolean isChanged = false;
         if (hbResponse.getStatus() == HbStatus.OK) {
+            if (!isAlive && !isReplay) {
+                BDBHA bdbha = (BDBHA) 
Catalog.getCurrentCatalog().getHaProtocol();
+                bdbha.removeUnReadyElectableNode(nodeName, 
Catalog.getCurrentCatalog().getFollowerCount());
+            }
             isAlive = true;
             version = hbResponse.getVersion();
             queryPort = hbResponse.getQueryPort();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 855e95c0f1..395aba00c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -157,7 +157,7 @@ public class HeartbeatMgr extends MasterDaemon {
                 FrontendHbResponse hbResponse = (FrontendHbResponse) response;
                 Frontend fe = 
Catalog.getCurrentCatalog().getFeByName(hbResponse.getName());
                 if (fe != null) {
-                    return fe.handleHbResponse(hbResponse);
+                    return fe.handleHbResponse(hbResponse, isReplay);
                 }
                 break;
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to