This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new 036c978be8 [cherry-pick][fix](TabletInvertedIndex) fix potential deadlock between ForkJoinPool and TabletInvertedIndex #11365 (#11418) 036c978be8 is described below commit 036c978be85912da75488e6937dbf95150e7959c Author: caiconghui <55968745+caicong...@users.noreply.github.com> AuthorDate: Tue Aug 2 14:31:37 2022 +0800 [cherry-pick][fix](TabletInvertedIndex) fix potential deadlock between ForkJoinPool and TabletInvertedIndex #11365 (#11418) Co-authored-by: caiconghui1 <caicongh...@jd.com> --- .../org/apache/doris/analysis/LoadColumnsInfo.java | 2 +- .../apache/doris/catalog/TabletInvertedIndex.java | 213 +++++++++++---------- .../org/apache/doris/catalog/TabletStatMgr.java | 45 +++-- .../doris/load/routineload/RoutineLoadManager.java | 2 +- 4 files changed, 136 insertions(+), 126 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java index c93b05d8bb..8d45479ed7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java @@ -66,7 +66,7 @@ public class LoadColumnsInfo implements ParseNode { sb.append(Joiner.on(",").join(columnNames)); sb.append(")"); - if (columnMappingList != null || columnMappingList.size() != 0) { + if (columnMappingList != null && !columnMappingList.isEmpty()) { sb.append(" SET ("); sb.append(Joiner.on(",").join(columnMappingList.parallelStream() .map(entity -> entity.toSql()).collect(Collectors.toList()))); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 28e2569ddb..5bf4063c2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -50,6 +50,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -94,6 +95,8 @@ public class TabletInvertedIndex { private volatile ImmutableSet<Long> partitionIdInMemorySet = ImmutableSet.of(); + private ForkJoinPool taskPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); + public TabletInvertedIndex() { } @@ -131,123 +134,125 @@ public class TabletInvertedIndex { Map<Long, Replica> replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); if (replicaMetaWithBackend != null) { // traverse replicas in meta with this backend - replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> { - long tabletId = entry.getKey(); - Preconditions.checkState(tabletMetaMap.containsKey(tabletId)); - TabletMeta tabletMeta = tabletMetaMap.get(tabletId); + taskPool.submit(() -> { + replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> { + long tabletId = entry.getKey(); + Preconditions.checkState(tabletMetaMap.containsKey(tabletId)); + TabletMeta tabletMeta = tabletMetaMap.get(tabletId); + + if (backendTablets.containsKey(tabletId)) { + TTablet backendTablet = backendTablets.get(tabletId); + Replica replica = entry.getValue(); + for (TTabletInfo backendTabletInfo : backendTablet.getTabletInfos()) { + if (tabletMeta.containsSchemaHash(backendTabletInfo.getSchemaHash())) { + foundTabletsWithValidSchema.add(tabletId); + if (partitionIdInMemorySet.contains(backendTabletInfo.getPartitionId()) != backendTabletInfo.isIsInMemory()) { + synchronized (tabletToInMemory) { + tabletToInMemory.add(new ImmutableTriple<>(tabletId, backendTabletInfo.getSchemaHash(), !backendTabletInfo.isIsInMemory())); + } + } + // 1. (intersection) + if (needSync(replica, backendTabletInfo)) { + // need sync + synchronized (tabletSyncMap) { + tabletSyncMap.put(tabletMeta.getDbId(), tabletId); + } + } - if (backendTablets.containsKey(tabletId)) { - TTablet backendTablet = backendTablets.get(tabletId); - Replica replica = entry.getValue(); - for (TTabletInfo backendTabletInfo : backendTablet.getTabletInfos()) { - if (tabletMeta.containsSchemaHash(backendTabletInfo.getSchemaHash())) { - foundTabletsWithValidSchema.add(tabletId); - if (partitionIdInMemorySet.contains(backendTabletInfo.getPartitionId()) != backendTabletInfo.isIsInMemory()) { - synchronized (tabletToInMemory) { - tabletToInMemory.add(new ImmutableTriple<>(tabletId, backendTabletInfo.getSchemaHash(), !backendTabletInfo.isIsInMemory())); + // check and set path + // path info of replica is only saved in Master FE + if (backendTabletInfo.isSetPathHash() && + replica.getPathHash() != backendTabletInfo.getPathHash()) { + replica.setPathHash(backendTabletInfo.getPathHash()); } - } - // 1. (intersection) - if (needSync(replica, backendTabletInfo)) { - // need sync - synchronized (tabletSyncMap) { - tabletSyncMap.put(tabletMeta.getDbId(), tabletId); + + if (backendTabletInfo.isSetSchemaHash() && replica.getState() == ReplicaState.NORMAL + && replica.getSchemaHash() != backendTabletInfo.getSchemaHash()) { + // update the schema hash only when replica is normal + replica.setSchemaHash(backendTabletInfo.getSchemaHash()); } - } - - // check and set path - // path info of replica is only saved in Master FE - if (backendTabletInfo.isSetPathHash() && - replica.getPathHash() != backendTabletInfo.getPathHash()) { - replica.setPathHash(backendTabletInfo.getPathHash()); - } - - if (backendTabletInfo.isSetSchemaHash() && replica.getState() == ReplicaState.NORMAL - && replica.getSchemaHash() != backendTabletInfo.getSchemaHash()) { - // update the schema hash only when replica is normal - replica.setSchemaHash(backendTabletInfo.getSchemaHash()); - } - - if (needRecover(replica, tabletMeta.getOldSchemaHash(), backendTabletInfo)) { - LOG.warn("replica {} of tablet {} on backend {} need recovery. " - + "replica in FE: {}, report version {}, report schema hash: {}," - + " is bad: {}, is version missing: {}", - replica.getId(), tabletId, backendId, replica, - backendTabletInfo.getVersion(), - backendTabletInfo.getSchemaHash(), - backendTabletInfo.isSetUsed() ? !backendTabletInfo.isUsed() : "false", - backendTabletInfo.isSetVersionMiss() ? backendTabletInfo.isVersionMiss() : "unset"); - synchronized (tabletRecoveryMap) { - tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId); + + if (needRecover(replica, tabletMeta.getOldSchemaHash(), backendTabletInfo)) { + LOG.warn("replica {} of tablet {} on backend {} need recovery. " + + "replica in FE: {}, report version {}, report schema hash: {}," + + " is bad: {}, is version missing: {}", + replica.getId(), tabletId, backendId, replica, + backendTabletInfo.getVersion(), + backendTabletInfo.getSchemaHash(), + backendTabletInfo.isSetUsed() ? !backendTabletInfo.isUsed() : "false", + backendTabletInfo.isSetVersionMiss() ? backendTabletInfo.isVersionMiss() : "unset"); + synchronized (tabletRecoveryMap) { + tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId); + } } - } - - long partitionId = tabletMeta.getPartitionId(); - if (!Config.disable_storage_medium_check) { - // check if need migration - TStorageMedium storageMedium = storageMediumMap.get(partitionId); - if (storageMedium != null && backendTabletInfo.isSetStorageMedium()) { - if (storageMedium != backendTabletInfo.getStorageMedium()) { - synchronized (tabletMigrationMap) { - tabletMigrationMap.put(storageMedium, tabletId); + + long partitionId = tabletMeta.getPartitionId(); + if (!Config.disable_storage_medium_check) { + // check if need migration + TStorageMedium storageMedium = storageMediumMap.get(partitionId); + if (storageMedium != null && backendTabletInfo.isSetStorageMedium()) { + if (storageMedium != backendTabletInfo.getStorageMedium()) { + synchronized (tabletMigrationMap) { + tabletMigrationMap.put(storageMedium, tabletId); + } + } + if (storageMedium != tabletMeta.getStorageMedium()) { + tabletMeta.setStorageMedium(storageMedium); } - } - if (storageMedium != tabletMeta.getStorageMedium()) { - tabletMeta.setStorageMedium(storageMedium); } } - } - - // check if should clear transactions - if (backendTabletInfo.isSetTransactionIds()) { - List<Long> transactionIds = backendTabletInfo.getTransactionIds(); - GlobalTransactionMgr transactionMgr = Catalog.getCurrentGlobalTransactionMgr(); - for (Long transactionId : transactionIds) { - TransactionState transactionState = transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId); - if (transactionState == null || transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { - synchronized (transactionsToClear) { - transactionsToClear.put(transactionId, tabletMeta.getPartitionId()); - } - LOG.debug("transaction id [{}] is not valid any more, " - + "clear it from backend [{}]", transactionId, backendId); - } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - TableCommitInfo tableCommitInfo = transactionState.getTableCommitInfo(tabletMeta.getTableId()); - PartitionCommitInfo partitionCommitInfo = tableCommitInfo == null ? null : tableCommitInfo.getPartitionCommitInfo(partitionId); - if (partitionCommitInfo != null) { - TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(tabletMeta.getPartitionId(), - partitionCommitInfo.getVersion(), 0); - synchronized (transactionsToPublish) { - ListMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(transactionState.getDbId()); - if (map == null) { - map = ArrayListMultimap.create(); - transactionsToPublish.put(transactionState.getDbId(), map); + + // check if should clear transactions + if (backendTabletInfo.isSetTransactionIds()) { + List<Long> transactionIds = backendTabletInfo.getTransactionIds(); + GlobalTransactionMgr transactionMgr = Catalog.getCurrentGlobalTransactionMgr(); + for (Long transactionId : transactionIds) { + TransactionState transactionState = transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId); + if (transactionState == null || transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { + synchronized (transactionsToClear) { + transactionsToClear.put(transactionId, tabletMeta.getPartitionId()); + } + LOG.debug("transaction id [{}] is not valid any more, " + + "clear it from backend [{}]", transactionId, backendId); + } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { + TableCommitInfo tableCommitInfo = transactionState.getTableCommitInfo(tabletMeta.getTableId()); + PartitionCommitInfo partitionCommitInfo = tableCommitInfo == null ? null : tableCommitInfo.getPartitionCommitInfo(partitionId); + if (partitionCommitInfo != null) { + TPartitionVersionInfo versionInfo = new TPartitionVersionInfo(tabletMeta.getPartitionId(), + partitionCommitInfo.getVersion(), 0); + synchronized (transactionsToPublish) { + ListMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(transactionState.getDbId()); + if (map == null) { + map = ArrayListMultimap.create(); + transactionsToPublish.put(transactionState.getDbId(), map); + } + map.put(transactionId, versionInfo); } - map.put(transactionId, versionInfo); } } } + } // end for txn id + + // update replicase's version count + // no need to write log, and no need to get db lock. + if (backendTabletInfo.isSetVersionCount()) { + replica.setVersionCount(backendTabletInfo.getVersionCount()); } - } // end for txn id - - // update replicase's version count - // no need to write log, and no need to get db lock. - if (backendTabletInfo.isSetVersionCount()) { - replica.setVersionCount(backendTabletInfo.getVersionCount()); - } - } else { - // tablet with invalid schemahash - foundTabletsWithInvalidSchema.put(tabletId, backendTabletInfo); - } // end for be tablet info + } else { + // tablet with invalid schemahash + foundTabletsWithInvalidSchema.put(tabletId, backendTabletInfo); + } // end for be tablet info + } + } else { + // 2. (meta - be) + // may need delete from meta + LOG.debug("backend[{}] does not report tablet[{}-{}]", backendId, tabletId, tabletMeta); + synchronized (tabletDeleteFromMeta) { + tabletDeleteFromMeta.put(tabletMeta.getDbId(), tabletId); + } } - } else { - // 2. (meta - be) - // may need delete from meta - LOG.debug("backend[{}] does not report tablet[{}-{}]", backendId, tabletId, tabletMeta); - synchronized (tabletDeleteFromMeta) { - tabletDeleteFromMeta.put(tabletMeta.getDbId(), tabletId); - } - } - }); + }); + }).join(); } } finally { readUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index 03eac7ca26..528cab2409 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ForkJoinPool; /* * TabletStatMgr is for collecting tablet(replica) statistics from backends. @@ -43,6 +44,8 @@ import java.util.Map; public class TabletStatMgr extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(TabletStatMgr.class); + private ForkJoinPool taskPool = new ForkJoinPool(); + public TabletStatMgr() { super("tablet stat mgr", Config.tablet_stat_update_interval_second * 1000); } @@ -51,27 +54,29 @@ public class TabletStatMgr extends MasterDaemon { protected void runAfterCatalogReady() { ImmutableMap<Long, Backend> backends = Catalog.getCurrentSystemInfo().getIdToBackend(); long start = System.currentTimeMillis(); - backends.values().parallelStream().forEach(backend -> { - BackendService.Client client = null; - TNetworkAddress address = null; - boolean ok = false; - try { - address = new TNetworkAddress(backend.getHost(), backend.getBePort()); - client = ClientPool.backendPool.borrowObject(address); - TTabletStatResult result = client.getTabletStat(); - LOG.debug("get tablet stat from backend: {}, num: {}", backend.getId(), result.getTabletsStatsSize()); - updateTabletStat(backend.getId(), result); - ok = true; - } catch (Exception e) { - LOG.warn("task exec error. backend[{}]", backend.getId(), e); - } finally { - if (ok) { - ClientPool.backendPool.returnObject(address, client); - } else { - ClientPool.backendPool.invalidateObject(address, client); + taskPool.submit(() -> { + backends.values().parallelStream().forEach(backend -> { + BackendService.Client client = null; + TNetworkAddress address = null; + boolean ok = false; + try { + address = new TNetworkAddress(backend.getHost(), backend.getBePort()); + client = ClientPool.backendPool.borrowObject(address); + TTabletStatResult result = client.getTabletStat(); + LOG.debug("get tablet stat from backend: {}, num: {}", backend.getId(), result.getTabletsStatsSize()); + updateTabletStat(backend.getId(), result); + ok = true; + } catch (Exception e) { + LOG.warn("task exec error. backend[{}]", backend.getId(), e); + } finally { + if (ok) { + ClientPool.backendPool.returnObject(address, client); + } else { + ClientPool.backendPool.invalidateObject(address, client); + } } - } - }); + }); + }).join(); LOG.debug("finished to get tablet stat of all backends. cost: {} ms", (System.currentTimeMillis() - start)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 9dac66b996..4b5e9b48ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -203,7 +203,7 @@ public class RoutineLoadManager implements Writable { Map<String, List<RoutineLoadJob>> labelToRoutineLoadJob = dbToNameToRoutineLoadJob.get(dbId); if (labelToRoutineLoadJob.containsKey(name)) { List<RoutineLoadJob> routineLoadJobList = labelToRoutineLoadJob.get(name); - Optional<RoutineLoadJob> optional = routineLoadJobList.parallelStream() + Optional<RoutineLoadJob> optional = routineLoadJobList.stream() .filter(entity -> entity.getName().equals(name)) .filter(entity -> !entity.getState().isFinalState()).findFirst(); if (optional.isPresent()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org