This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ea832744df3 [fix](journal) ensure txns are matched with the master
before replaying (#28192)
ea832744df3 is described below
commit ea832744df3a243793cb881cf3b4344590bc54f7
Author: walter <[email protected]>
AuthorDate: Wed Dec 13 18:14:51 2023 +0800
[fix](journal) ensure txns are matched with the master before replaying
(#28192)
---
.../apache/doris/journal/bdbje/BDBJEJournal.java | 72 +++++++++++--
.../doris/journal/bdbje/BDBEnvironmentTest.java | 111 +++++++++++++++++++--
2 files changed, 167 insertions(+), 16 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 1781d8a56d6..ebdbadae192 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -38,12 +38,16 @@ import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.StatsConfig;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.ReplicaConsistencyException;
import com.sleepycat.je.rep.ReplicaWriteException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.RollbackException;
+import com.sleepycat.je.rep.TimeConsistencyPolicy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,6 +56,7 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/*
@@ -124,10 +129,7 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
// id is the key
long id = nextJournalId.getAndIncrement();
- Long idLong = id;
- DatabaseEntry theKey = new DatabaseEntry();
- TupleBinding<Long> idBinding =
TupleBinding.getPrimitiveBinding(Long.class);
- idBinding.objectToEntry(idLong, theKey);
+ DatabaseEntry theKey = idToKey(id);
// entity is the value
DataOutputBuffer buffer = new
DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
@@ -203,6 +205,13 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
return id;
}
+ private static DatabaseEntry idToKey(Long id) {
+ DatabaseEntry theKey = new DatabaseEntry();
+ TupleBinding<Long> idBinding =
TupleBinding.getPrimitiveBinding(Long.class);
+ idBinding.objectToEntry(id, theKey);
+ return theKey;
+ }
+
@Override
public JournalEntity read(long journalId) {
List<Long> dbNames = getDatabaseNames();
@@ -224,7 +233,7 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
}
JournalEntity ret = null;
- Long key = new Long(journalId);
+ Long key = journalId;
DatabaseEntry theKey = new DatabaseEntry();
TupleBinding<Long> myBinding =
TupleBinding.getPrimitiveBinding(Long.class);
myBinding.objectToEntry(key, theKey);
@@ -270,7 +279,7 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
if (dbNames == null) {
return ret;
}
- if (dbNames.size() == 0) {
+ if (dbNames.isEmpty()) {
return ret;
}
@@ -278,9 +287,52 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
String dbName = dbNames.get(index).toString();
long dbNumberName = dbNames.get(index);
Database database = bdbEnvironment.openDatabase(dbName);
- ret = dbNumberName + database.count() - 1;
+ if (!isReplicaTxnAreMatched(database, dbNumberName)) {
+ LOG.warn("The current replica hasn't synced up with the master,
current db name: {}", dbNumberName);
+ if (index != 0) {
+ // Because roll journal occurs after write, the previous write
must have
+ // been replicated to the majority, so it can be guaranteed
that the database
+ // will not be rollback.
+ return dbNumberName - 1;
+ }
+ return -1;
+ }
+ return dbNumberName + database.count() - 1;
+ }
- return ret;
+ // Whether the replica txns are matched with the master.
+ //
+ // BDBJE could throw InsufficientAcksException during post commit, at that
time the
+ // log has persisted in disk. When the replica is restarted, we need to
ensure that
+ // before replaying the journals, sync up txns with the new master in the
cluster and
+ // rollback the txns that have been persisted but have not committed to
the majority.
+ //
+ // See
org.apache.doris.journal.bdbje.BDBEnvironmentTest#testReadTxnIsNotMatched for
details.
+ private boolean isReplicaTxnAreMatched(Database database, Long id) {
+ // The time lag is set to Integer.MAX_VALUE if the replica haven't
synced up
+ // with the master. By allowing a very large lag, we can detect
whether the
+ // replica has synced up with the master.
+ TimeConsistencyPolicy consistencyPolicy = new TimeConsistencyPolicy(
+ 1, TimeUnit.DAYS, 1, TimeUnit.MINUTES);
+ Transaction txn = null;
+ try {
+ TransactionConfig cfg = new TransactionConfig()
+ .setReadOnly(true)
+ .setReadCommitted(true)
+ .setConsistencyPolicy(consistencyPolicy);
+
+ txn =
bdbEnvironment.getReplicatedEnvironment().beginTransaction(null, cfg);
+
+ DatabaseEntry key = idToKey(id);
+ database.get(txn, key, null, LockMode.READ_COMMITTED);
+ return true;
+ } catch (ReplicaConsistencyException e) {
+ return false;
+ } finally {
+ if (txn != null) {
+ txn.abort();
+ }
+ }
}
@Override
@@ -293,7 +345,7 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
if (dbNames == null) {
return ret;
}
- if (dbNames.size() == 0) {
+ if (dbNames.isEmpty()) {
return ret;
}
@@ -350,7 +402,7 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
LOG.error("fail to get dbNames while open bdbje journal.
will exit");
System.exit(-1);
}
- if (dbNames.size() == 0) {
+ if (dbNames.isEmpty()) {
/*
* This is the very first time to open. Usually, we will
open a new database
* named "1".
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
index 09b10b604b8..f61fbc6bf98 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
@@ -31,8 +31,11 @@ import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Durability;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.rep.InsufficientAcksException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.impl.RepImpl;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import org.apache.commons.io.FileUtils;
@@ -46,6 +49,7 @@ import org.junit.jupiter.api.RepeatedTest;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.net.SocketException;
@@ -367,15 +371,16 @@ public class BDBEnvironmentTest {
for (int i = 0; i < 10; i++) {
electionSuccess = true;
for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
- if (entryPair.first.getReplicatedEnvironment().getState()
- .equals(ReplicatedEnvironment.State.MASTER)) {
+ ReplicatedEnvironment env =
entryPair.first.getReplicatedEnvironment();
+ if (env == null) {
+ continue;
+ }
+ if (env.getState().equals(ReplicatedEnvironment.State.MASTER))
{
masterEnvironment = entryPair.first;
masterNode = entryPair.second;
}
- if (!entryPair.first.getReplicatedEnvironment().getState()
- .equals(ReplicatedEnvironment.State.MASTER)
- &&
!entryPair.first.getReplicatedEnvironment().getState()
- .equals(ReplicatedEnvironment.State.REPLICA)) {
+ if (!env.getState().equals(ReplicatedEnvironment.State.MASTER)
+ &&
!env.getState().equals(ReplicatedEnvironment.State.REPLICA)) {
electionSuccess = false;
}
}
@@ -559,4 +564,98 @@ public class BDBEnvironmentTest {
Assertions.assertEquals(Durability.ReplicaAckPolicy.SIMPLE_MAJORITY,
Deencapsulation.invoke(BDBEnvironment.class, "getAckPolicy",
"default"));
}
+
+ @RepeatedTest(1)
+ public void testReadTxnIsNotMatched() throws Exception {
+ List<Pair<BDBEnvironment, NodeInfo>> followersInfo = new ArrayList<>();
+
+ int masterPort = findValidPort();
+ String masterNodeName = "fe1";
+ String masterNodeHostPort = "127.0.0.1:" + masterPort;
+
+ BDBEnvironment masterEnvironment = new BDBEnvironment(true, false);
+ String masterDir = createTmpDir();
+ masterEnvironment.setup(new File(masterDir), masterNodeName,
masterNodeHostPort, masterNodeHostPort);
+ followersInfo.add(Pair.of(masterEnvironment, new
NodeInfo(masterNodeName, masterNodeHostPort, masterDir)));
+
+ for (int i = 2; i <= 3; i++) {
+ int nodePort = findValidPort();
+ String nodeName = "fe" + i;
+ String nodeHostPort = "127.0.0.1:" + nodePort;
+
+ BDBEnvironment followerEnvironment = new BDBEnvironment(true,
false);
+ String nodeDir = createTmpDir();
+ followerEnvironment.setup(new File(nodeDir), nodeName,
nodeHostPort, masterNodeHostPort);
+ followersInfo.add(Pair.of(followerEnvironment, new
NodeInfo(nodeName, nodeHostPort, nodeDir)));
+ }
+
+ Pair<BDBEnvironment, NodeInfo> masterPair = findMaster(followersInfo);
+ String beginDbName = String.valueOf(0L);
+ Database masterDb = masterPair.first.openDatabase(beginDbName);
+ DatabaseEntry key = new DatabaseEntry(randomBytes());
+ DatabaseEntry value = new DatabaseEntry(randomBytes());
+ Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null,
key, value));
+ Assertions.assertEquals(1,
masterEnvironment.getDatabaseNames().size());
+ LOG.info("master is {} | {}", masterPair.second.name,
masterPair.second.dir);
+
+ for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
+ if (entryPair.second.dir.equals(masterPair.second.dir)) {
+ LOG.info("skip {}", entryPair.second.name);
+ continue;
+ }
+
+ Assertions.assertEquals(1,
entryPair.first.getDatabaseNames().size());
+ Database followerDb = entryPair.first.openDatabase(beginDbName);
+ DatabaseEntry readValue = new DatabaseEntry();
+ Assertions.assertEquals(OperationStatus.SUCCESS,
followerDb.get(null, key, readValue, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(new String(value.getData()), new
String(readValue.getData()));
+ }
+
+ Field envImplField =
ReplicatedEnvironment.class.getDeclaredField("repEnvironmentImpl");
+ envImplField.setAccessible(true);
+ RepImpl impl = (RepImpl)
envImplField.get(masterPair.first.getReplicatedEnvironment());
+ Assertions.assertNotNull(impl);
+
+ new Expectations(impl) {{
+ // Below method will replicate log item to followers.
+ impl.registerVLSN(withNotNull());
+ // Below method will wait until the logs are replicated.
+ impl.postLogCommitHook(withNotNull(), withNotNull());
+ result = new InsufficientAcksException("mocked");
+ }};
+
+ long count = masterDb.count();
+ final Database oldMasterDb = masterDb;
+ Assertions.assertThrows(InsufficientAcksException.class, () -> {
+ // Since this key is not replicated to any replicas, it should not
be read.
+ DatabaseEntry k = new DatabaseEntry(new byte[]{1, 2, 3});
+ DatabaseEntry v = new DatabaseEntry(new byte[]{4, 5, 6});
+ oldMasterDb.put(null, k, v);
+ });
+
+ LOG.info("close old master {} | {}", masterPair.second.name,
masterPair.second.dir);
+ masterDb.close();
+ masterEnvironment.getEpochDB().close();
+ masterEnvironment.close();
+
+ for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
+ if (entryPair.second.dir.equals(masterPair.second.dir)) {
+ LOG.info("skip {}", entryPair.second.name);
+ continue;
+ }
+ LOG.info("close follower {} | {}", entryPair.second.name,
entryPair.second.dir);
+ entryPair.first.close();
+ }
+
+ masterPair.first.openReplicatedEnvironment(new
File(masterPair.second.dir));
+ masterDb = masterPair.first.openDatabase(beginDbName);
+ LOG.info("open {} | {}", masterPair.second.name,
masterPair.second.dir);
+
+ // The local commit txn is readable!!!
+ Assertions.assertEquals(count + 1, masterDb.count());
+
+ key = new DatabaseEntry(new byte[]{1, 2, 3});
+ DatabaseEntry readValue = new DatabaseEntry();
+ Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.get(null,
key, readValue, LockMode.READ_COMMITTED));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]