This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9969932589 [INLONG-11157][Manager] Asynchronous processing agent 
installation (#11159)
9969932589 is described below

commit 9969932589fb92ba26a1f1897a124b6dda476993
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Mon Sep 23 14:06:27 2024 +0800

    [INLONG-11157][Manager] Asynchronous processing agent installation (#11159)
---
 .../inlong/manager/common/enums/NodeStatus.java    | 10 +++-
 .../dao/mapper/InlongClusterNodeEntityMapper.java  |  3 +-
 .../mappers/InlongClusterNodeEntityMapper.xml      |  9 ++-
 .../service/cluster/InlongClusterServiceImpl.java  | 69 +++++++++++++++++++---
 .../node/AgentClusterNodeInstallOperator.java      | 18 +++++-
 5 files changed, 96 insertions(+), 13 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/NodeStatus.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/NodeStatus.java
index b240c7217a..7ee92cc537 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/NodeStatus.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/NodeStatus.java
@@ -26,7 +26,15 @@ public enum NodeStatus {
 
     NORMAL(1),
 
-    HEARTBEAT_TIMEOUT(2);
+    HEARTBEAT_TIMEOUT(2),
+
+    INSTALLING(3),
+
+    INSTALL_FAILED(4),
+
+    INSTALL_SUCCESS(5),
+
+    UNLOAD_FAILED(6);
 
     int status;
 
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
index f168de98d9..2b87842e99 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
@@ -55,7 +55,8 @@ public interface InlongClusterNodeEntityMapper {
      */
     int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer 
nextStatus, @Param("status") Integer status);
 
-    int updateOperateLogById(@Param("id") Integer id, @Param("operateLog") 
String operateLog);
+    int updateOperateLogById(@Param("id") Integer id, @Param("nextStatus") 
Integer nextStatus,
+            @Param("operateLog") String operateLog);
 
     int deleteById(Integer id);
 
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 2afe00ee6b..7604362106 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -256,7 +256,14 @@
     </update>
     <update id="updateOperateLogById">
         update inlong_cluster_node
-        set operate_log = #{operateLog,jdbcType=LONGVARCHAR}
+        <set>
+            <if test="nextStatus != null">
+                status = #{nextStatus,jdbcType=INTEGER},
+            </if>
+            <if test="operateLog != null">
+                operate_log  = #{operateLog,jdbcType=LONGVARCHAR}
+            </if>
+        </set>
         where id = #{id,jdbcType=INTEGER}
         and is_deleted = 0
     </update>
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index f399455803..7882e29c12 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -90,6 +90,7 @@ import com.github.pagehelper.PageHelper;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.gson.Gson;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -101,6 +102,8 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Isolation;
 import org.springframework.transaction.annotation.Transactional;
 
+import javax.annotation.PostConstruct;
+
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -113,6 +116,12 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.manager.pojo.cluster.InlongClusterTagExtParam.packExtParams;
@@ -126,6 +135,15 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongClusterServiceImpl.class);
     private static final Gson GSON = new Gson();
+    private final ExecutorService executorService = new ThreadPoolExecutor(
+            5,
+            10,
+            10L,
+            TimeUnit.SECONDS,
+            new ArrayBlockingQueue<>(100),
+            new 
ThreadFactoryBuilder().setNameFormat("agent-install-%s").build(),
+            new CallerRunsPolicy());
+    private final LinkedBlockingQueue<ClusterNodeRequest> 
pendingInstallRequests = new LinkedBlockingQueue<>();
 
     @Autowired
     private InlongGroupEntityMapper groupMapper;
@@ -158,6 +176,13 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
     @Autowired
     private DataProxyConfigRepository proxyRepository;
 
+    @PostConstruct
+    private void startInstallTask() {
+        InstallTaskRunnable installTaskRunnable = new InstallTaskRunnable();
+        this.executorService.execute(installTaskRunnable);
+        LOGGER.info("install task started successfully");
+    }
+
     @Override
     public Integer saveTag(ClusterTagRequest request, String operator) {
         LOGGER.debug("begin to save cluster tag {}", request);
@@ -692,9 +717,7 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         Integer id = instance.saveOpt(request, operator);
         if (request.getIsInstall()) {
             request.setId(id);
-            InlongClusterNodeInstallOperator clusterNodeInstallOperator = 
clusterNodeInstallOperatorFactory.getInstance(
-                    request.getType());
-            clusterNodeInstallOperator.install(request, operator);
+            pendingInstallRequests.add(request);
         }
         return id;
     }
@@ -810,7 +833,6 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ)
     public Boolean updateNode(ClusterNodeRequest request, String operator) {
         LOGGER.debug("begin to update inlong cluster node={}", request);
         Preconditions.expectNotNull(request, "inlong cluster node cannot be 
empty");
@@ -843,9 +865,9 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         InlongClusterNodeOperator instance = 
clusterNodeOperatorFactory.getInstance(request.getType());
         instance.updateOpt(request, operator);
         if (request.getIsInstall()) {
-            InlongClusterNodeInstallOperator clusterNodeInstallOperator = 
clusterNodeInstallOperatorFactory.getInstance(
-                    request.getType());
-            clusterNodeInstallOperator.install(request, operator);
+            // when reinstall set install to false
+            request.setIsInstall(false);
+            pendingInstallRequests.add(request);
         }
         return true;
     }
@@ -1381,4 +1403,37 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
             request.setClusterTags(entity.getClusterTags());
         }
     }
+
+    private class InstallTaskRunnable implements Runnable {
+
+        private static final int WAIT_SECONDS = 60 * 1000;
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    processInstall();
+                    Thread.sleep(WAIT_SECONDS);
+                } catch (Exception e) {
+                    LOGGER.error("exception occurred when install", e);
+                }
+            }
+        }
+
+        @Transactional(rollbackFor = Throwable.class)
+        public void processInstall() {
+            if (pendingInstallRequests.isEmpty()) {
+                return;
+            }
+            ClusterNodeRequest request = pendingInstallRequests.poll();
+            InlongClusterNodeInstallOperator clusterNodeInstallOperator = 
clusterNodeInstallOperatorFactory.getInstance(
+                    request.getType());
+            if (request.getIsInstall()) {
+                clusterNodeInstallOperator.install(request, 
request.getCurrentUser());
+            } else {
+                clusterNodeInstallOperator.reInstall(request, 
request.getCurrentUser());
+            }
+
+        }
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
index 06fd1db3ad..abf8a895cd 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.cluster.node;
 
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ModuleType;
+import org.apache.inlong.manager.common.enums.NodeStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.AESUtils;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -96,12 +97,17 @@ public class AgentClusterNodeInstallOperator implements 
InlongClusterNodeInstall
     public boolean install(ClusterNodeRequest clusterNodeRequest, String 
operator) {
         LOGGER.info("begin to insert agent cluster node={}", 
clusterNodeRequest);
         try {
+            
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), 
NodeStatus.INSTALLING.getStatus(),
+                    "begin to install");
             AgentClusterNodeRequest request = (AgentClusterNodeRequest) 
clusterNodeRequest;
             deployInstaller(request, operator);
             String startCmd = agentInstallPath + INSTALLER_START_CMD;
             commandExecutor.execRemote(request, startCmd);
+            
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
+                    NodeStatus.INSTALL_SUCCESS.getStatus(), "success to 
install");
         } catch (Exception e) {
-            
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), 
e.getMessage());
+            
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
+                    NodeStatus.INSTALL_FAILED.getStatus(), e.getMessage());
             String errMsg = String.format("install agent cluster node failed 
for ip=%s", clusterNodeRequest.getIp());
             LOGGER.error(errMsg, e);
             throw new BusinessException(errMsg);
@@ -114,13 +120,18 @@ public class AgentClusterNodeInstallOperator implements 
InlongClusterNodeInstall
     public boolean reInstall(ClusterNodeRequest clusterNodeRequest, String 
operator) {
         LOGGER.info("begin to reInstall agent cluster node={}", 
clusterNodeRequest);
         try {
+            
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), 
NodeStatus.INSTALLING.getStatus(),
+                    "begin to reinstall");
             AgentClusterNodeRequest request = (AgentClusterNodeRequest) 
clusterNodeRequest;
             commandExecutor.rmDir(request, agentInstallPath.substring(0, 
agentInstallPath.lastIndexOf(File.separator)));
             deployInstaller(request, operator);
             String reStartCmd = agentInstallPath + INSTALLER_RESTART_CMD;
             commandExecutor.execRemote(request, reStartCmd);
+            
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), 
NodeStatus.NORMAL.getStatus(),
+                    "success to reinstall");
         } catch (Exception e) {
-            
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), 
e.getMessage());
+            
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
+                    NodeStatus.INSTALL_FAILED.getStatus(), e.getMessage());
             String errMsg = String.format("reInstall agent cluster node failed 
for ip=%s", clusterNodeRequest.getIp());
             LOGGER.error(errMsg, e);
             throw new BusinessException(errMsg);
@@ -140,7 +151,8 @@ public class AgentClusterNodeInstallOperator implements 
InlongClusterNodeInstall
             commandExecutor.execRemote(request, stopCmd);
             commandExecutor.rmDir(request, agentInstallPath.substring(0, 
agentInstallPath.lastIndexOf(File.separator)));
         } catch (Exception e) {
-            
clusterNodeEntityMapper.updateOperateLogById(clusterNodeEntity.getId(), 
e.getMessage());
+            
clusterNodeEntityMapper.updateOperateLogById(clusterNodeEntity.getId(),
+                    NodeStatus.UNLOAD_FAILED.getStatus(), e.getMessage());
             String errMsg = String.format("unload agent cluster node failed 
for ip=%s", clusterNodeEntity.getIp());
             LOGGER.error(errMsg, e);
             throw new BusinessException(errMsg);

Reply via email to