This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7dd01b1fa9b [fix](publish) Pick Catch exception in genPublishTask to
make one failed txn does not block the other txns (#37724) (#38042)
7dd01b1fa9b is described below
commit 7dd01b1fa9b3e3def95f8a62b67fe452b16e40d3
Author: meiyi <[email protected]>
AuthorDate: Thu Jul 18 14:15:49 2024 +0800
[fix](publish) Pick Catch exception in genPublishTask to make one failed
txn does not block the other txns (#37724) (#38042)
Pick https://github.com/apache/doris/pull/37724
---
.../doris/transaction/PublishVersionDaemon.java | 98 ++++++++++++----------
1 file changed, 54 insertions(+), 44 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index a1861fb7f4d..ac1cbe9154c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -108,58 +108,68 @@ public class PublishVersionDaemon extends MasterDaemon {
if (transactionState.hasSendTask()) {
continue;
}
- List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>();
- for (TableCommitInfo tableCommitInfo :
transactionState.getIdToTableCommitInfos().values()) {
-
partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values());
-
- try {
-
beIdToBaseTabletIds.putAll(getBaseTabletIdsForEachBe(transactionState,
tableCommitInfo));
- } catch (MetaNotFoundException e) {
- LOG.warn("exception occur when trying to get rollup
tablets info", e);
- }
+ try {
+ genPublishTask(allBackends, transactionState,
createPublishVersionTaskTime, beIdToBaseTabletIds,
+ batchTask);
+ } catch (Throwable t) {
+ LOG.error("errors while generate publish task for transaction:
{}", transactionState, t);
}
+ }
+ if (!batchTask.getAllTasks().isEmpty()) {
+ AgentTaskExecutor.submit(batchTask);
+ }
+ }
- List<TPartitionVersionInfo> partitionVersionInfos = new
ArrayList<>(partitionCommitInfos.size());
- for (PartitionCommitInfo commitInfo : partitionCommitInfos) {
- TPartitionVersionInfo versionInfo = new
TPartitionVersionInfo(commitInfo.getPartitionId(),
- commitInfo.getVersion(), 0);
- partitionVersionInfos.add(versionInfo);
- if (LOG.isDebugEnabled()) {
- LOG.debug("try to publish version info partitionid [{}],
version [{}]",
- commitInfo.getPartitionId(),
- commitInfo.getVersion());
- }
- }
+ private void genPublishTask(List<Long> allBackends, TransactionState
transactionState,
+ long createPublishVersionTaskTime, Map<Long, Set<Long>>
beIdToBaseTabletIds, AgentBatchTask batchTask) {
+ List<PartitionCommitInfo> partitionCommitInfos = new ArrayList<>();
+ for (TableCommitInfo tableCommitInfo :
transactionState.getIdToTableCommitInfos().values()) {
+
partitionCommitInfos.addAll(tableCommitInfo.getIdToPartitionCommitInfo().values());
- Set<Long> publishBackends =
transactionState.getPublishVersionTasks().keySet();
- // public version tasks are not persisted in catalog, so
publishBackends may be empty.
- // so we have to try publish to all backends;
- if (publishBackends.isEmpty()) {
- // could not just add to it, should new a new object, or the
back map will destroyed
- publishBackends = Sets.newHashSet();
- publishBackends.addAll(allBackends);
+ try {
+
beIdToBaseTabletIds.putAll(getBaseTabletIdsForEachBe(transactionState,
tableCommitInfo));
+ } catch (MetaNotFoundException e) {
+ LOG.warn("exception occur when trying to get rollup tablets
info", e);
}
+ }
- for (long backendId : publishBackends) {
- PublishVersionTask task = new PublishVersionTask(backendId,
- transactionState.getTransactionId(),
- transactionState.getDbId(),
- partitionVersionInfos,
- createPublishVersionTaskTime);
-
task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId,
Collections.emptySet()));
- // add to AgentTaskQueue for handling finish report.
- // not check return value, because the add will success
- AgentTaskQueue.addTask(task);
- batchTask.addTask(task);
- transactionState.addPublishVersionTask(backendId, task);
+ List<TPartitionVersionInfo> partitionVersionInfos = new
ArrayList<>(partitionCommitInfos.size());
+ for (PartitionCommitInfo commitInfo : partitionCommitInfos) {
+ TPartitionVersionInfo versionInfo = new
TPartitionVersionInfo(commitInfo.getPartitionId(),
+ commitInfo.getVersion(), 0);
+ partitionVersionInfos.add(versionInfo);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("try to publish version info partitionid [{}],
version [{}]",
+ commitInfo.getPartitionId(),
+ commitInfo.getVersion());
}
- transactionState.setSendedTask();
- LOG.info("send publish tasks for transaction: {}, db: {}",
transactionState.getTransactionId(),
- transactionState.getDbId());
}
- if (!batchTask.getAllTasks().isEmpty()) {
- AgentTaskExecutor.submit(batchTask);
+
+ Set<Long> publishBackends =
transactionState.getPublishVersionTasks().keySet();
+ // public version tasks are not persisted in catalog, so
publishBackends may be empty.
+ // so we have to try publish to all backends;
+ if (publishBackends.isEmpty()) {
+ // could not just add to it, should new a new object, or the back
map will destroyed
+ publishBackends = Sets.newHashSet();
+ publishBackends.addAll(allBackends);
+ }
+
+ for (long backendId : publishBackends) {
+ PublishVersionTask task = new PublishVersionTask(backendId,
+ transactionState.getTransactionId(),
+ transactionState.getDbId(),
+ partitionVersionInfos,
+ createPublishVersionTaskTime);
+ task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId,
Collections.emptySet()));
+ // add to AgentTaskQueue for handling finish report.
+ // not check return value, because the add will success
+ AgentTaskQueue.addTask(task);
+ batchTask.addTask(task);
+ transactionState.addPublishVersionTask(backendId, task);
}
+ transactionState.setSendedTask();
+ LOG.info("send publish tasks for transaction: {}, db: {}",
transactionState.getTransactionId(),
+ transactionState.getDbId());
}
private void tryFinishTxn(List<TransactionState> readyTransactionStates,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]