This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c936abd2a3 [fix](fe) when bdbje adding follower, master write op may failed. (#10376) c936abd2a3 is described below commit c936abd2a37b2dd4213c0240b32df98ddd4b4017 Author: Lei Zhang <1091517...@qq.com> AuthorDate: Wed Jul 6 10:29:16 2022 +0800 [fix](fe) when bdbje adding follower, master write op may failed. (#10376) --- .../java/org/apache/doris/catalog/Catalog.java | 13 ++++ .../src/main/java/org/apache/doris/ha/BDBHA.java | 71 ++++++++++++++++++---- .../apache/doris/journal/bdbje/BDBJEJournal.java | 39 ++++++++---- .../java/org/apache/doris/persist/EditLog.java | 39 +++++++----- .../java/org/apache/doris/system/Frontend.java | 29 +++++---- .../java/org/apache/doris/system/HeartbeatMgr.java | 2 +- 6 files changed, 140 insertions(+), 53 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 2798b2b36e..4020ffa72f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -2470,6 +2470,7 @@ public class Catalog { if (role == FrontendNodeType.FOLLOWER || role == FrontendNodeType.REPLICA) { bdbha.addHelperSocket(host, editLogPort); helperNodes.add(Pair.create(host, editLogPort)); + bdbha.addUnReadyElectableNode(nodeName, getFollowerCount()); } bdbha.removeConflictNodeIfExist(host, editLogPort); editLog.logAddFrontend(fe); @@ -2499,6 +2500,8 @@ public class Catalog { if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) { haProtocol.removeElectableNode(fe.getNodeName()); helperNodes.remove(Pair.create(host, port)); + BDBHA ha = (BDBHA) haProtocol; + ha.removeUnReadyElectableNode(nodeName, getFollowerCount()); } editLog.logRemoveFrontend(fe); } finally { @@ -4931,4 +4934,14 @@ public class Catalog { sb.append("\nCOMMENT '").append(table.getComment(true)).append("'"); } } + + public int getFollowerCount() { + int count = 0; + for (Frontend fe : frontends.values()) { + if (fe.getRole() == FrontendNodeType.FOLLOWER) { + count++; + } + } + return count; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java index 22d15373d8..02f110b40a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java +++ b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java @@ -1,17 +1,17 @@ // Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file +// or more contributor license agreements. See the NOTICE file // distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file +// regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at +// with the License. You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the +// KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. @@ -28,7 +28,9 @@ import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.OperationStatus; import com.sleepycat.je.rep.MasterStateException; import com.sleepycat.je.rep.MemberNotFoundException; +import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicationGroup; +import com.sleepycat.je.rep.ReplicationMutableConfig; import com.sleepycat.je.rep.ReplicationNode; import com.sleepycat.je.rep.UnknownMasterException; import com.sleepycat.je.rep.util.ReplicationGroupAdmin; @@ -37,16 +39,30 @@ import org.apache.logging.log4j.Logger; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; public class BDBHA implements HAProtocol { private static final Logger LOG = LogManager.getLogger(BDBHA.class); - private BDBEnvironment environment; - private String nodeName; + private final BDBEnvironment environment; + private final String nodeName; private static final int RETRY_TIME = 3; + // Unstable node is a follower node that is joining the cluster but have not + // completed. + // We should record this kind node and set the bdb electable group size to + // (size_of_all_followers - size_of_unstable_nodes). + // Because once the handshake is successful, the joined node is put into the + // optional group, + // but it may take a little time for this node to replicate the historical data. + // This node will never respond to a new data replication until the historical + // replication is completed, + // and if the master cannot receive a quorum response, the write operation will + // fail. + private final Set<String> unReadyElectableNodes = new HashSet<>(); + public BDBHA(BDBEnvironment env, String nodeName) { this.environment = env; this.nodeName = nodeName; @@ -124,7 +140,8 @@ public class BDBHA implements HAProtocol { if (leaderIncluded) { ret.add(replicationNode.getSocketAddress()); } else { - if (!replicationNode.getName().equals(replicationGroupAdmin.getMasterNodeName())) { + if (!replicationNode.getName() + .equals(replicationGroupAdmin.getMasterNodeName())) { ret.add(replicationNode.getSocketAddress()); } } @@ -204,17 +221,20 @@ public class BDBHA implements HAProtocol { return true; } - // When new Follower FE is added to the cluster, it should also be added to the helper sockets in + // When new Follower FE is added to the cluster, it should also be added to the + // helper sockets in // ReplicationGroupAdmin, in order to fix the following case: // 1. A Observer starts with helper of master FE. // 2. Master FE is dead, new Master is elected. // 3. Observer's helper sockets only contains the info of the dead master FE. - // So when you try to get frontends' info from this Observer, it will throw the Exception: - // "Could not determine master from helpers at:[/dead master FE host:port]" + // So when you try to get frontends' info from this Observer, it will throw the + // Exception: + // "Could not determine master from helpers at:[/dead master FE host:port]" public void addHelperSocket(String ip, Integer port) { ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin(); - Set<InetSocketAddress> helperSockets = Sets.newHashSet(replicationGroupAdmin.getHelperSockets()); - InetSocketAddress newHelperSocket = new InetSocketAddress(ip, port); + Set<InetSocketAddress> helperSockets = + Sets.newHashSet(replicationGroupAdmin.getHelperSockets()); + InetSocketAddress newHelperSocket = new InetSocketAddress(ip, port); if (!helperSockets.contains(newHelperSocket)) { helperSockets.add(newHelperSocket); environment.setNewReplicationGroupAdmin(helperSockets); @@ -240,4 +260,29 @@ public class BDBHA implements HAProtocol { removeElectableNode(conflictNode); } } + + public synchronized void addUnReadyElectableNode(String nodeName, int totalFollowerCount) { + unReadyElectableNodes.add(nodeName); + ReplicatedEnvironment replicatedEnvironment = environment.getReplicatedEnvironment(); + if (replicatedEnvironment != null) { + replicatedEnvironment.setRepMutableConfig(new ReplicationMutableConfig() + .setElectableGroupSizeOverride(totalFollowerCount - unReadyElectableNodes.size())); + } + } + + public synchronized void removeUnReadyElectableNode(String nodeName, int totalFollowerCount) { + unReadyElectableNodes.remove(nodeName); + ReplicatedEnvironment replicatedEnvironment = environment.getReplicatedEnvironment(); + if (replicatedEnvironment != null) { + if (unReadyElectableNodes.isEmpty()) { + // Setting ElectableGroupSizeOverride to 0 means remove this config, + // and bdb will use the normal electable group size. + replicatedEnvironment.setRepMutableConfig( + new ReplicationMutableConfig().setElectableGroupSizeOverride(0)); + } else { + replicatedEnvironment.setRepMutableConfig(new ReplicationMutableConfig() + .setElectableGroupSizeOverride(totalFollowerCount - unReadyElectableNodes.size())); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java index 4d787f866b..4974ec0f85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java @@ -163,7 +163,8 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B if (op == OperationType.OP_TIMESTAMP) { /* * Do not exit if the write operation is OP_TIMESTAMP. - * If all the followers exit except master, master should continue provide query service. + * If all the followers exit except master, master should continue provide query + * service. * To prevent master exit, we should exempt OP_TIMESTAMP write */ nextJournalId.set(id); @@ -307,7 +308,8 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B } } - // Open a new journal database or get last existing one as current journal database + // Open a new journal database or get last existing one as current journal + // database List<Long> dbNames = null; for (int i = 0; i < RETRY_TIME; i++) { try { @@ -319,10 +321,11 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B } if (dbNames.size() == 0) { /* - * This is the very first time to open. Usually, we will open a new database named "1". - * But when we start cluster with an image file copied from other cluster, - * here we should open database with name image max journal id + 1. - * (default Catalog.getServingCatalog().getReplayedJournalId() is 0) + * This is the very first time to open. Usually, we will open a new database + * named "1". + * But when we start cluster with an image file copied from other cluster, + * here we should open database with name image max journal id + 1. + * (default Catalog.getServingCatalog().getReplayedJournalId() is 0) */ String dbName = Long.toString(Catalog.getServingCatalog().getReplayedJournalId() + 1); LOG.info("the very first time to open bdb, dbname is {}", dbName); @@ -344,8 +347,10 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B private void reSetupBdbEnvironment(InsufficientLogException insufficientLogEx) { LOG.warn("catch insufficient log exception. will recover and try again.", insufficientLogEx); - // Copy the missing log files from a member of the replication group who owns the files - // ATTN: here we use `getServingCatalog()`, because only serving catalog has helper nodes. + // Copy the missing log files from a member of the replication group who owns + // the files + // ATTN: here we use `getServingCatalog()`, because only serving catalog has + // helper nodes. Pair<String, Integer> helperNode = Catalog.getServingCatalog().getHelperNode(); NetworkRestore restore = new NetworkRestore(); NetworkRestoreConfig config = new NetworkRestoreConfig(); @@ -413,19 +418,23 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B return null; } - // Open a new journal database or get last existing one as current journal database - List<Long> dbNames = null; + // Open a new journal database or get last existing one as current journal + // database + List<Long> dbNames = null; for (int i = 0; i < RETRY_TIME; i++) { try { dbNames = bdbEnvironment.getDatabaseNames(); break; } catch (InsufficientLogException insufficientLogEx) { /* - * If this is not a checkpoint thread, which means this maybe the FE startup thread, - * or a replay thread. We will reopen bdbEnvironment for these 2 cases to get valid log + * If this is not a checkpoint thread, which means this maybe the FE startup + * thread, + * or a replay thread. We will reopen bdbEnvironment for these 2 cases to get + * valid log * from helper nodes. * - * The checkpoint thread will only run on Master FE. And Master FE should not encounter + * The checkpoint thread will only run on Master FE. And Master FE should not + * encounter * these exception. So if it happens, throw exception out. */ if (!Catalog.isCheckpointThread()) { @@ -446,4 +455,8 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B return dbNames; } + + public BDBEnvironment getBDBEnvironment() { + return this.bdbEnvironment; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 0871855d20..3e0b531970 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -466,7 +466,7 @@ public class EditLog { int version = Integer.parseInt(versionString); if (version > FeConstants.meta_version) { LOG.error("meta data version is out of date, image: {}. meta: {}." - + "please update FeConstants.meta_version and restart.", + + "please update FeConstants.meta_version and restart.", MetaContext.get().getMetaVersion(), FeConstants.meta_version); System.exit(-1); } @@ -546,8 +546,8 @@ public class EditLog { break; } case OperationType.OP_BATCH_REMOVE_TXNS: { - final BatchRemoveTransactionsOperation operation - = (BatchRemoveTransactionsOperation) journal.getData(); + final BatchRemoveTransactionsOperation operation = (BatchRemoveTransactionsOperation) journal + .getData(); Catalog.getCurrentGlobalTransactionMgr().replayBatchRemoveTransactions(operation); break; } @@ -647,8 +647,8 @@ public class EditLog { break; } case OperationType.OP_CREATE_LOAD_JOB: { - org.apache.doris.load.loadv2.LoadJob loadJob = - (org.apache.doris.load.loadv2.LoadJob) journal.getData(); + org.apache.doris.load.loadv2.LoadJob loadJob = (org.apache.doris.load.loadv2.LoadJob) journal + .getData(); catalog.getLoadManager().replayCreateLoadJob(loadJob); break; } @@ -742,8 +742,8 @@ public class EditLog { break; } case OperationType.OP_REPLACE_TEMP_PARTITION: { - ReplacePartitionOperationLog replaceTempPartitionLog - = (ReplacePartitionOperationLog) journal.getData(); + ReplacePartitionOperationLog replaceTempPartitionLog = (ReplacePartitionOperationLog) journal + .getData(); catalog.replayReplaceTempPartition(replaceTempPartitionLog); break; } @@ -858,13 +858,19 @@ public class EditLog { * for a table that no longer exists. * 1. Thread 1: get TableA object * 2. Thread 2: lock db and drop table and record edit log of the dropped TableA - * 3. Thread 1: lock table, modify table and record edit log of the modified TableA + * 3. Thread 1: lock table, modify table and record edit log of the modified + * TableA * **The modified edit log is after the dropped edit log** - * Because the table has been dropped, the olapTable in here is null when the modified edit log is replayed. - * So in this case, we will ignore the edit log of the modified table after the table is dropped. - * This could make the meta inconsistent, for example, an edit log on a dropped table is ignored, but - * this table is restored later, so there may be an inconsistent situation between master and followers. We - * log a warning here to debug when happens. This could happen to other meta like DB. + * Because the table has been dropped, the olapTable in here is null when the + * modified edit log is replayed. + * So in this case, we will ignore the edit log of the modified table after the + * table is dropped. + * This could make the meta inconsistent, for example, an edit log on a dropped + * table is ignored, but + * this table is restored later, so there may be an inconsistent situation + * between master and followers. We + * log a warning here to debug when happens. This could happen to other meta + * like DB. */ LOG.warn("[INCONSISTENT META] replay failed {}: {}", journal, e.getMessage(), e); } catch (Exception e) { @@ -911,7 +917,8 @@ public class EditLog { try { journal.write(op, writable); } catch (Throwable t) { - // Throwable contains all Exception and Error, such as IOException and OutOfMemoryError + // Throwable contains all Exception and Error, such as IOException and + // OutOfMemoryError LOG.error("Fatal Error : write stream Exception", t); System.exit(-1); } @@ -1461,4 +1468,8 @@ public class EditLog { public void logDatasourceLog(short id, CatalogLog log) { logEdit(id, log); } + + public Journal getJournal() { + return this.journal; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java index bcad0012c4..1dd498beb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java @@ -1,24 +1,26 @@ // Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file +// or more contributor license agreements. See the NOTICE file // distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file +// regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at +// with the License. You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the +// KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package org.apache.doris.system; +import org.apache.doris.catalog.Catalog; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.ha.BDBHA; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.system.HeartbeatResponse.HbStatus; @@ -42,8 +44,7 @@ public class Frontend implements Writable { private boolean isAlive = false; - public Frontend() { - } + public Frontend() {} public Frontend(FrontendNodeType role, String nodeName, String host, int editLogPort) { this.role = role; @@ -97,14 +98,18 @@ public class Frontend implements Writable { } /** - * handle Frontend's heartbeat response. - * Because the replayed journal id is very likely to be changed at each heartbeat response, - * so we simple return true if the heartbeat status is OK. - * But if heartbeat status is BAD, only return true if it is the first time to transfer from alive to dead. + * handle Frontend's heartbeat response. Because the replayed journal id is very likely to be + * changed at each heartbeat response, so we simple return true if the heartbeat status is OK. + * But if heartbeat status is BAD, only return true if it is the first time to transfer from + * alive to dead. */ - public boolean handleHbResponse(FrontendHbResponse hbResponse) { + public boolean handleHbResponse(FrontendHbResponse hbResponse, boolean isReplay) { boolean isChanged = false; if (hbResponse.getStatus() == HbStatus.OK) { + if (!isAlive && !isReplay) { + BDBHA bdbha = (BDBHA) Catalog.getCurrentCatalog().getHaProtocol(); + bdbha.removeUnReadyElectableNode(nodeName, Catalog.getCurrentCatalog().getFollowerCount()); + } isAlive = true; version = hbResponse.getVersion(); queryPort = hbResponse.getQueryPort(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 855e95c0f1..395aba00c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -157,7 +157,7 @@ public class HeartbeatMgr extends MasterDaemon { FrontendHbResponse hbResponse = (FrontendHbResponse) response; Frontend fe = Catalog.getCurrentCatalog().getFeByName(hbResponse.getName()); if (fe != null) { - return fe.handleHbResponse(hbResponse); + return fe.handleHbResponse(hbResponse, isReplay); } break; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org