This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new 036c978be8 [cherry-pick][fix](TabletInvertedIndex) fix potential 
deadlock between ForkJoinPool and TabletInvertedIndex #11365 (#11418)
036c978be8 is described below

commit 036c978be85912da75488e6937dbf95150e7959c
Author: caiconghui <55968745+caicong...@users.noreply.github.com>
AuthorDate: Tue Aug 2 14:31:37 2022 +0800

    [cherry-pick][fix](TabletInvertedIndex) fix potential deadlock between 
ForkJoinPool and TabletInvertedIndex #11365 (#11418)
    
    Co-authored-by: caiconghui1 <caicongh...@jd.com>
---
 .../org/apache/doris/analysis/LoadColumnsInfo.java |   2 +-
 .../apache/doris/catalog/TabletInvertedIndex.java  | 213 +++++++++++----------
 .../org/apache/doris/catalog/TabletStatMgr.java    |  45 +++--
 .../doris/load/routineload/RoutineLoadManager.java |   2 +-
 4 files changed, 136 insertions(+), 126 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java
index c93b05d8bb..8d45479ed7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java
@@ -66,7 +66,7 @@ public class LoadColumnsInfo implements ParseNode {
         sb.append(Joiner.on(",").join(columnNames));
         sb.append(")");
 
-        if (columnMappingList != null || columnMappingList.size() != 0) {
+        if (columnMappingList != null && !columnMappingList.isEmpty()) {
             sb.append(" SET (");
             sb.append(Joiner.on(",").join(columnMappingList.parallelStream()
                                                   .map(entity -> 
entity.toSql()).collect(Collectors.toList())));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 28e2569ddb..5bf4063c2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -50,6 +50,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
@@ -94,6 +95,8 @@ public class TabletInvertedIndex {
 
     private volatile ImmutableSet<Long> partitionIdInMemorySet = 
ImmutableSet.of();
 
+    private ForkJoinPool taskPool = new 
ForkJoinPool(Runtime.getRuntime().availableProcessors());
+
     public TabletInvertedIndex() {
     }
 
@@ -131,123 +134,125 @@ public class TabletInvertedIndex {
             Map<Long, Replica> replicaMetaWithBackend = 
backingReplicaMetaTable.row(backendId);
             if (replicaMetaWithBackend != null) {
                 // traverse replicas in meta with this backend
-                
replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> {
-                    long tabletId = entry.getKey();
-                    
Preconditions.checkState(tabletMetaMap.containsKey(tabletId));
-                    TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
+                taskPool.submit(() -> {
+                    
replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> {
+                        long tabletId = entry.getKey();
+                        
Preconditions.checkState(tabletMetaMap.containsKey(tabletId));
+                        TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
+
+                        if (backendTablets.containsKey(tabletId)) {
+                            TTablet backendTablet = 
backendTablets.get(tabletId);
+                            Replica replica = entry.getValue();
+                            for (TTabletInfo backendTabletInfo : 
backendTablet.getTabletInfos()) {
+                                if 
(tabletMeta.containsSchemaHash(backendTabletInfo.getSchemaHash())) {
+                                    foundTabletsWithValidSchema.add(tabletId);
+                                    if 
(partitionIdInMemorySet.contains(backendTabletInfo.getPartitionId()) != 
backendTabletInfo.isIsInMemory()) {
+                                        synchronized (tabletToInMemory) {
+                                            tabletToInMemory.add(new 
ImmutableTriple<>(tabletId, backendTabletInfo.getSchemaHash(), 
!backendTabletInfo.isIsInMemory()));
+                                        }
+                                    }
+                                    // 1. (intersection)
+                                    if (needSync(replica, backendTabletInfo)) {
+                                        // need sync
+                                        synchronized (tabletSyncMap) {
+                                            
tabletSyncMap.put(tabletMeta.getDbId(), tabletId);
+                                        }
+                                    }
 
-                    if (backendTablets.containsKey(tabletId)) {
-                        TTablet backendTablet = backendTablets.get(tabletId);
-                        Replica replica = entry.getValue();
-                        for (TTabletInfo backendTabletInfo : 
backendTablet.getTabletInfos()) {
-                            if 
(tabletMeta.containsSchemaHash(backendTabletInfo.getSchemaHash())) {
-                                foundTabletsWithValidSchema.add(tabletId);
-                                if 
(partitionIdInMemorySet.contains(backendTabletInfo.getPartitionId()) != 
backendTabletInfo.isIsInMemory()) {
-                                    synchronized (tabletToInMemory) {
-                                        tabletToInMemory.add(new 
ImmutableTriple<>(tabletId, backendTabletInfo.getSchemaHash(), 
!backendTabletInfo.isIsInMemory()));
+                                    // check and set path
+                                    // path info of replica is only saved in 
Master FE
+                                    if (backendTabletInfo.isSetPathHash() &&
+                                            replica.getPathHash() != 
backendTabletInfo.getPathHash()) {
+                                        
replica.setPathHash(backendTabletInfo.getPathHash());
                                     }
-                                }
-                                // 1. (intersection)
-                                if (needSync(replica, backendTabletInfo)) {
-                                    // need sync
-                                    synchronized (tabletSyncMap) {
-                                        
tabletSyncMap.put(tabletMeta.getDbId(), tabletId);
+
+                                    if (backendTabletInfo.isSetSchemaHash() && 
replica.getState() == ReplicaState.NORMAL
+                                            && replica.getSchemaHash() != 
backendTabletInfo.getSchemaHash()) {
+                                        // update the schema hash only when 
replica is normal
+                                        
replica.setSchemaHash(backendTabletInfo.getSchemaHash());
                                     }
-                                }
-
-                                // check and set path
-                                // path info of replica is only saved in 
Master FE
-                                if (backendTabletInfo.isSetPathHash() &&
-                                        replica.getPathHash() != 
backendTabletInfo.getPathHash()) {
-                                    
replica.setPathHash(backendTabletInfo.getPathHash());
-                                }
-
-                                if (backendTabletInfo.isSetSchemaHash() && 
replica.getState() == ReplicaState.NORMAL
-                                        && replica.getSchemaHash() != 
backendTabletInfo.getSchemaHash()) {
-                                    // update the schema hash only when 
replica is normal
-                                    
replica.setSchemaHash(backendTabletInfo.getSchemaHash());
-                                }
-
-                                if (needRecover(replica, 
tabletMeta.getOldSchemaHash(), backendTabletInfo)) {
-                                    LOG.warn("replica {} of tablet {} on 
backend {} need recovery. "
-                                                    + "replica in FE: {}, 
report version {}, report schema hash: {},"
-                                                    + " is bad: {}, is version 
missing: {}",
-                                            replica.getId(), tabletId, 
backendId, replica,
-                                            backendTabletInfo.getVersion(),
-                                            backendTabletInfo.getSchemaHash(),
-                                            backendTabletInfo.isSetUsed() ? 
!backendTabletInfo.isUsed() : "false",
-                                            
backendTabletInfo.isSetVersionMiss() ? backendTabletInfo.isVersionMiss() : 
"unset");
-                                    synchronized (tabletRecoveryMap) {
-                                        
tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId);
+
+                                    if (needRecover(replica, 
tabletMeta.getOldSchemaHash(), backendTabletInfo)) {
+                                        LOG.warn("replica {} of tablet {} on 
backend {} need recovery. "
+                                                        + "replica in FE: {}, 
report version {}, report schema hash: {},"
+                                                        + " is bad: {}, is 
version missing: {}",
+                                                replica.getId(), tabletId, 
backendId, replica,
+                                                backendTabletInfo.getVersion(),
+                                                
backendTabletInfo.getSchemaHash(),
+                                                backendTabletInfo.isSetUsed() 
? !backendTabletInfo.isUsed() : "false",
+                                                
backendTabletInfo.isSetVersionMiss() ? backendTabletInfo.isVersionMiss() : 
"unset");
+                                        synchronized (tabletRecoveryMap) {
+                                            
tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId);
+                                        }
                                     }
-                                }
-
-                                long partitionId = tabletMeta.getPartitionId();
-                                if (!Config.disable_storage_medium_check) {
-                                    // check if need migration
-                                    TStorageMedium storageMedium = 
storageMediumMap.get(partitionId);
-                                    if (storageMedium != null && 
backendTabletInfo.isSetStorageMedium()) {
-                                        if (storageMedium != 
backendTabletInfo.getStorageMedium()) {
-                                            synchronized (tabletMigrationMap) {
-                                                
tabletMigrationMap.put(storageMedium, tabletId);
+
+                                    long partitionId = 
tabletMeta.getPartitionId();
+                                    if (!Config.disable_storage_medium_check) {
+                                        // check if need migration
+                                        TStorageMedium storageMedium = 
storageMediumMap.get(partitionId);
+                                        if (storageMedium != null && 
backendTabletInfo.isSetStorageMedium()) {
+                                            if (storageMedium != 
backendTabletInfo.getStorageMedium()) {
+                                                synchronized 
(tabletMigrationMap) {
+                                                    
tabletMigrationMap.put(storageMedium, tabletId);
+                                                }
+                                            }
+                                            if (storageMedium != 
tabletMeta.getStorageMedium()) {
+                                                
tabletMeta.setStorageMedium(storageMedium);
                                             }
-                                        }
-                                        if (storageMedium != 
tabletMeta.getStorageMedium()) {
-                                            
tabletMeta.setStorageMedium(storageMedium);
                                         }
                                     }
-                                }
-
-                                // check if should clear transactions
-                                if (backendTabletInfo.isSetTransactionIds()) {
-                                    List<Long> transactionIds = 
backendTabletInfo.getTransactionIds();
-                                    GlobalTransactionMgr transactionMgr = 
Catalog.getCurrentGlobalTransactionMgr();
-                                    for (Long transactionId : transactionIds) {
-                                        TransactionState transactionState = 
transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId);
-                                        if (transactionState == null || 
transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
-                                            synchronized (transactionsToClear) 
{
-                                                
transactionsToClear.put(transactionId, tabletMeta.getPartitionId());
-                                            }
-                                            LOG.debug("transaction id [{}] is 
not valid any more, "
-                                                    + "clear it from backend 
[{}]", transactionId, backendId);
-                                        } else if 
(transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
-                                            TableCommitInfo tableCommitInfo = 
transactionState.getTableCommitInfo(tabletMeta.getTableId());
-                                            PartitionCommitInfo 
partitionCommitInfo = tableCommitInfo == null ? null : 
tableCommitInfo.getPartitionCommitInfo(partitionId);
-                                            if (partitionCommitInfo != null) {
-                                                TPartitionVersionInfo 
versionInfo = new TPartitionVersionInfo(tabletMeta.getPartitionId(),
-                                                        
partitionCommitInfo.getVersion(), 0);
-                                                synchronized 
(transactionsToPublish) {
-                                                    ListMultimap<Long, 
TPartitionVersionInfo> map = 
transactionsToPublish.get(transactionState.getDbId());
-                                                    if (map == null) {
-                                                        map = 
ArrayListMultimap.create();
-                                                        
transactionsToPublish.put(transactionState.getDbId(), map);
+
+                                    // check if should clear transactions
+                                    if 
(backendTabletInfo.isSetTransactionIds()) {
+                                        List<Long> transactionIds = 
backendTabletInfo.getTransactionIds();
+                                        GlobalTransactionMgr transactionMgr = 
Catalog.getCurrentGlobalTransactionMgr();
+                                        for (Long transactionId : 
transactionIds) {
+                                            TransactionState transactionState 
= transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId);
+                                            if (transactionState == null || 
transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+                                                synchronized 
(transactionsToClear) {
+                                                    
transactionsToClear.put(transactionId, tabletMeta.getPartitionId());
+                                                }
+                                                LOG.debug("transaction id [{}] 
is not valid any more, "
+                                                        + "clear it from 
backend [{}]", transactionId, backendId);
+                                            } else if 
(transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+                                                TableCommitInfo 
tableCommitInfo = transactionState.getTableCommitInfo(tabletMeta.getTableId());
+                                                PartitionCommitInfo 
partitionCommitInfo = tableCommitInfo == null ? null : 
tableCommitInfo.getPartitionCommitInfo(partitionId);
+                                                if (partitionCommitInfo != 
null) {
+                                                    TPartitionVersionInfo 
versionInfo = new TPartitionVersionInfo(tabletMeta.getPartitionId(),
+                                                            
partitionCommitInfo.getVersion(), 0);
+                                                    synchronized 
(transactionsToPublish) {
+                                                        ListMultimap<Long, 
TPartitionVersionInfo> map = 
transactionsToPublish.get(transactionState.getDbId());
+                                                        if (map == null) {
+                                                            map = 
ArrayListMultimap.create();
+                                                            
transactionsToPublish.put(transactionState.getDbId(), map);
+                                                        }
+                                                        map.put(transactionId, 
versionInfo);
                                                     }
-                                                    map.put(transactionId, 
versionInfo);
                                                 }
                                             }
                                         }
+                                    } // end for txn id
+
+                                    // update replicase's version count
+                                    // no need to write log, and no need to 
get db lock.
+                                    if (backendTabletInfo.isSetVersionCount()) 
{
+                                        
replica.setVersionCount(backendTabletInfo.getVersionCount());
                                     }
-                                } // end for txn id
-
-                                // update replicase's version count
-                                // no need to write log, and no need to get db 
lock.
-                                if (backendTabletInfo.isSetVersionCount()) {
-                                    
replica.setVersionCount(backendTabletInfo.getVersionCount());
-                                }
-                            } else {
-                                // tablet with invalid schemahash
-                                foundTabletsWithInvalidSchema.put(tabletId, 
backendTabletInfo);
-                            } // end for be tablet info
+                                } else {
+                                    // tablet with invalid schemahash
+                                    
foundTabletsWithInvalidSchema.put(tabletId, backendTabletInfo);
+                                } // end for be tablet info
+                            }
+                        } else {
+                            // 2. (meta - be)
+                            // may need delete from meta
+                            LOG.debug("backend[{}] does not report 
tablet[{}-{}]", backendId, tabletId, tabletMeta);
+                            synchronized (tabletDeleteFromMeta) {
+                                tabletDeleteFromMeta.put(tabletMeta.getDbId(), 
tabletId);
+                            }
                         }
-                    } else {
-                        // 2. (meta - be)
-                        // may need delete from meta
-                        LOG.debug("backend[{}] does not report tablet[{}-{}]", 
backendId, tabletId, tabletMeta);
-                        synchronized (tabletDeleteFromMeta) {
-                            tabletDeleteFromMeta.put(tabletMeta.getDbId(), 
tabletId);
-                        }
-                    }
-                });
+                    });
+                }).join();
             }
         } finally {
             readUnlock();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index 03eac7ca26..528cab2409 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableMap;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ForkJoinPool;
 
 /*
  * TabletStatMgr is for collecting tablet(replica) statistics from backends.
@@ -43,6 +44,8 @@ import java.util.Map;
 public class TabletStatMgr extends MasterDaemon {
     private static final Logger LOG = 
LogManager.getLogger(TabletStatMgr.class);
 
+    private ForkJoinPool taskPool = new ForkJoinPool();
+
     public TabletStatMgr() {
         super("tablet stat mgr", Config.tablet_stat_update_interval_second * 
1000);
     }
@@ -51,27 +54,29 @@ public class TabletStatMgr extends MasterDaemon {
     protected void runAfterCatalogReady() {
         ImmutableMap<Long, Backend> backends = 
Catalog.getCurrentSystemInfo().getIdToBackend();
         long start = System.currentTimeMillis();
-        backends.values().parallelStream().forEach(backend -> {
-            BackendService.Client client = null;
-            TNetworkAddress address = null;
-            boolean ok = false;
-            try {
-                address = new TNetworkAddress(backend.getHost(), 
backend.getBePort());
-                client = ClientPool.backendPool.borrowObject(address);
-                TTabletStatResult result = client.getTabletStat();
-                LOG.debug("get tablet stat from backend: {}, num: {}", 
backend.getId(), result.getTabletsStatsSize());
-                updateTabletStat(backend.getId(), result);
-                ok = true;
-            } catch (Exception e) {
-                LOG.warn("task exec error. backend[{}]", backend.getId(), e);
-            } finally {
-                if (ok) {
-                    ClientPool.backendPool.returnObject(address, client);
-                } else {
-                    ClientPool.backendPool.invalidateObject(address, client);
+        taskPool.submit(() -> {
+            backends.values().parallelStream().forEach(backend -> {
+                BackendService.Client client = null;
+                TNetworkAddress address = null;
+                boolean ok = false;
+                try {
+                    address = new TNetworkAddress(backend.getHost(), 
backend.getBePort());
+                    client = ClientPool.backendPool.borrowObject(address);
+                    TTabletStatResult result = client.getTabletStat();
+                    LOG.debug("get tablet stat from backend: {}, num: {}", 
backend.getId(), result.getTabletsStatsSize());
+                    updateTabletStat(backend.getId(), result);
+                    ok = true;
+                } catch (Exception e) {
+                    LOG.warn("task exec error. backend[{}]", backend.getId(), 
e);
+                } finally {
+                    if (ok) {
+                        ClientPool.backendPool.returnObject(address, client);
+                    } else {
+                        ClientPool.backendPool.invalidateObject(address, 
client);
+                    }
                 }
-            }
-        });
+            });
+        }).join();
         LOG.debug("finished to get tablet stat of all backends. cost: {} ms",
                 (System.currentTimeMillis() - start));
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 9dac66b996..4b5e9b48ef 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -203,7 +203,7 @@ public class RoutineLoadManager implements Writable {
             Map<String, List<RoutineLoadJob>> labelToRoutineLoadJob = 
dbToNameToRoutineLoadJob.get(dbId);
             if (labelToRoutineLoadJob.containsKey(name)) {
                 List<RoutineLoadJob> routineLoadJobList = 
labelToRoutineLoadJob.get(name);
-                Optional<RoutineLoadJob> optional = 
routineLoadJobList.parallelStream()
+                Optional<RoutineLoadJob> optional = routineLoadJobList.stream()
                         .filter(entity -> entity.getName().equals(name))
                         .filter(entity -> 
!entity.getState().isFinalState()).findFirst();
                 if (optional.isPresent()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to