This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1124808 [Enhancement] Add detail msg to show the reason of publish
failure. (#3647)
1124808 is described below
commit 1124808fbc22d1544cc4b83fd7cfee23af0c01fb
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri May 22 22:59:53 2020 +0800
[Enhancement] Add detail msg to show the reason of publish failure. (#3647)
Add 2 new columns `PublishTime` and `ErrMsg` to show publish version time
and errors happen during the transaction process. Can be seen by executing:
`SHOW PROC "/transactions/dbId/";`
or
`SHOW TRANSACTION WHERE ID=xx;`
Currently is only record error happen in publish phase, which can help us
to find out which txn
is blocked.
Fix #3646
---
.../org/apache/doris/common/proc/TransProcDir.java | 2 ++
.../doris/transaction/DatabaseTransactionMgr.java | 32 +++++++++++++++-------
.../doris/transaction/PublishVersionDaemon.java | 4 ++-
.../apache/doris/transaction/TransactionState.java | 19 ++++++++++++-
.../transaction/DatabaseTransactionMgrTest.java | 9 +++---
5 files changed, 50 insertions(+), 16 deletions(-)
diff --git a/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java
b/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java
index 0cef447..118cc85 100644
--- a/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java
+++ b/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java
@@ -35,11 +35,13 @@ public class TransProcDir implements ProcDirInterface {
.add("LoadJobSourceType")
.add("PrepareTime")
.add("CommitTime")
+ .add("PublishTime")
.add("FinishTime")
.add("Reason")
.add("ErrorReplicasCount")
.add("ListenerId")
.add("TimeoutMs")
+ .add("ErrMsg")
.build();
public static final int MAX_SHOW_ENTRIES = 2000;
diff --git
a/fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index fef801f..ac2764e 100644
--- a/fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -17,13 +17,6 @@
package org.apache.doris.transaction;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
@@ -44,9 +37,9 @@ import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -59,6 +52,15 @@ import org.apache.doris.task.ClearTransactionTask;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -231,11 +233,13 @@ public class DatabaseTransactionMgr {
info.add(txnState.getSourceType().name());
info.add(TimeUtils.longToTimeString(txnState.getPrepareTime()));
info.add(TimeUtils.longToTimeString(txnState.getCommitTime()));
+ info.add(TimeUtils.longToTimeString(txnState.getPublishVersionTime()));
info.add(TimeUtils.longToTimeString(txnState.getFinishTime()));
info.add(txnState.getReason());
info.add(String.valueOf(txnState.getErrorReplicas().size()));
info.add(String.valueOf(txnState.getCallbackId()));
info.add(String.valueOf(txnState.getTimeoutMs()));
+ info.add(txnState.getErrMsg());
}
public long beginTransaction(List<Long> tableIdList, String label,
TUniqueId requestId,
@@ -579,8 +583,8 @@ public class DatabaseTransactionMgr {
}
public List<TransactionState> getCommittedTxnList() {
+ readLock();
try {
- readLock();
// only send task to committed transaction
return idToRunningTransactionState.values().stream()
.filter(transactionState ->
(transactionState.getTransactionStatus() == TransactionStatus.COMMITTED))
@@ -653,6 +657,9 @@ public class DatabaseTransactionMgr {
transactionId,
partitionCommitInfo.getVersion(),
partition.getVisibleVersion());
+ String errMsg = String.format("wait for publishing
partition %d version %d. self version: %d. table %d",
+ partitionId, partition.getVisibleVersion() +
1, partitionCommitInfo.getVersion(), tableId);
+ transactionState.setErrorMsg(errMsg);
return;
}
int quorumReplicaNum =
partitionInfo.getReplicationNum(partitionId) / 2 + 1;
@@ -721,8 +728,12 @@ public class DatabaseTransactionMgr {
}
if (healthReplicaNum < quorumReplicaNum) {
- LOG.info("publish version failed for
transaction {} on tablet {}, with only {} replicas less than quorum {}",
+ LOG.info("publish version failed for
transaction {} on tablet {}, with only {} replicas less than quorum {}",
transactionState, tablet,
healthReplicaNum, quorumReplicaNum);
+ String errMsg = String.format("publish on
tablet %d failed. succeed replica num %d less than quorum %d."
+ + " table: %d, partition: %d, publish
version: %d",
+ tablet.getId(), healthReplicaNum,
quorumReplicaNum, tableId, partitionId, partition.getVisibleVersion() + 1);
+ transactionState.setErrorMsg(errMsg);
hasError = true;
}
}
@@ -737,6 +748,7 @@ public class DatabaseTransactionMgr {
try {
transactionState.setErrorReplicas(errorReplicaIds);
transactionState.setFinishTime(System.currentTimeMillis());
+ transactionState.clearErrorMsg();
transactionState.setTransactionStatus(TransactionStatus.VISIBLE);
unprotectUpsertTransactionState(transactionState, false);
txnOperated = true;
diff --git
a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 1b637b0..70b0e52 100644
--- a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -167,7 +167,7 @@ public class PublishVersionDaemon extends MasterDaemon {
// transaction's publish is timeout, but there still has
unfinished tasks.
// we need to collect all error replicas, and try to
finish this txn.
for (PublishVersionTask unfinishedTask : unfinishedTasks) {
- // set all replica in the backend to error state
+ // set all replicas in the backend to error state
List<TPartitionVersionInfo> versionInfos =
unfinishedTask.getPartitionVersionInfos();
Set<Long> errorPartitionIds = Sets.newHashSet();
for (TPartitionVersionInfo versionInfo : versionInfos)
{
@@ -177,6 +177,8 @@ public class PublishVersionDaemon extends MasterDaemon {
continue;
}
+ // get all tablets of these error partitions, and mark
their replicas as error.
+ // current we don't have partition to tablet map in
FE, so here we use an inefficient way.
// TODO(cmy): this is inefficient, but just keep it
simple. will change it later.
List<Long> tabletIds =
tabletInvertedIndex.getTabletIdsByBackendId(unfinishedTask.getBackendId());
for (long tabletId : tabletIds) {
diff --git
a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
index 4ddbfaf..dbccc14 100644
--- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -187,7 +187,7 @@ public class TransactionState implements Writable {
// this state need not to be serialized
private Map<Long, PublishVersionTask> publishVersionTasks;
private boolean hasSendTask;
- private long publishVersionTime;
+ private long publishVersionTime = -1;
private TransactionStatus preStatus = null;
private long callbackId = -1;
@@ -206,6 +206,11 @@ public class TransactionState implements Writable {
private String errorLogUrl = null;
+ // record some error msgs during the transaction operation.
+ // this msg will be shown in show proc "/transactions/dbId/";
+ // no need to persist.
+ private String errMsg = "";
+
public TransactionState() {
this.dbId = -1;
this.tableIdList = Lists.newArrayList();
@@ -646,4 +651,16 @@ public class TransactionState implements Writable {
}
}
}
+
+ public void setErrorMsg(String errMsg) {
+ this.errMsg = errMsg;
+ }
+
+ public void clearErrorMsg() {
+ this.errMsg = "";
+ }
+
+ public String getErrMsg() {
+ return this.errMsg;
+ }
}
diff --git
a/fe/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
b/fe/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index 38399a1..78c0699 100644
---
a/fe/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++
b/fe/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -214,10 +214,11 @@ public class DatabaseTransactionMgrTest {
assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(5)));
assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(6)));
assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(7)));
- assertEquals("", txnInfo.get(8));
- assertEquals("0", txnInfo.get(9));
- assertEquals("-1", txnInfo.get(10));
- assertEquals(String.valueOf(Config.stream_load_default_timeout_second
* 1000), txnInfo.get(11));
+ assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(8)));
+ assertEquals("", txnInfo.get(9));
+ assertEquals("0", txnInfo.get(10));
+ assertEquals("-1", txnInfo.get(11));
+ assertEquals(String.valueOf(Config.stream_load_default_timeout_second
* 1000), txnInfo.get(12));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]