This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris-manager.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ba0737  fix module delete and config bug && update agent code (#16)
8ba0737 is described below

commit 8ba0737589967c8a3b2f5537adcfd3af73d75a5f
Author: LiRui <1176867...@qq.com>
AuthorDate: Thu Mar 24 15:37:29 2022 +0800

    fix module delete and config bug && update agent code (#16)
    
    * add dm-agent resource includes
    
    * fix module delete/config bug
    
    * export agent config and start module at module root dir
    
    * check agent port before starting agent
    
    * add cache for heartbeat event
    
    Co-authored-by: lirui40 <liru...@baidu.com>
---
 build.sh                                           |  4 ++
 .../service/heartbeat/DorisInstanceOperator.java   | 10 ++--
 .../component/ModelControlRequestComponent.java    |  1 -
 .../control/manager/DorisClusterModuleManager.java |  4 ++
 .../manager/ResourceNodeAndAgentManager.java       | 53 ++++++++++++++++++++++
 .../org/apache/doris/stack/shell/BaseCommand.java  | 22 +++++++--
 .../org/apache/doris/stack/util/Constants.java     |  1 +
 .../src/main/resources/cache/ehcache.xml           | 25 ++++++++++
 .../doris/stack/dao/ClusterInstanceRepository.java | 14 ++++++
 .../doris/stack/dao/HeartBeatEventRepository.java  | 20 ++++++++
 .../doris/stack/dao/ResourceNodeRepository.java    | 21 +++++++++
 11 files changed, 166 insertions(+), 9 deletions(-)

diff --git a/build.sh b/build.sh
index 75ad587..4dfb601 100644
--- a/build.sh
+++ b/build.sh
@@ -50,6 +50,10 @@ mv output/manager-bin/agent output/
 mv output/manager-bin output/server/bin
 mkdir -p output/agent/lib
 mv manager/dm-agent/target/dm-agent-1.0.0.jar output/agent/lib/dm-agent.jar
+
+mkdir -p output/agent/config
+cp manager/dm-agent/src/main/resources/application.properties 
output/agent/config
+
 cp -r manager/manager-server/src/main/resources/web-resource output/server/
 tar -zcvf doris-manager-1.0.0.tar.gz output/
 echo "copy to output package end"
\ No newline at end of file
diff --git 
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java
 
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java
index c9195b5..961f4a4 100644
--- 
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java
+++ 
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java
@@ -306,9 +306,13 @@ public class DorisInstanceOperator {
 
     private void executePkgShellScriptWithBash(String scriptName, String 
runningDir,
                                        String moduleName, Map<String, String> 
environment) throws Exception {
-        String scripts = Paths.get(runningDir, moduleName, "bin", 
scriptName).toFile().getAbsolutePath();
-        final String shellCmd = "sh " + scripts;
-        log.info("begin to execute: `" + shellCmd + "`");
+        String mouduleRootDir = runningDir + File.separator + moduleName;
+        String script = "bin" + File.separator + scriptName;
+
+        String cmdFormat = "cd %s && sh %s";
+        final String shellCmd = String.format(cmdFormat, mouduleRootDir, 
script);
+
+        log.info("begin to execute with bash: `" + shellCmd + "`");
         ShellUtil.cmdExecute(shellCmd);
     }
 
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/component/ModelControlRequestComponent.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/component/ModelControlRequestComponent.java
index e49e71e..8c72032 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/component/ModelControlRequestComponent.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/component/ModelControlRequestComponent.java
@@ -51,7 +51,6 @@ public class ModelControlRequestComponent {
         requestEntity.setCurrentEventType(eventType);
         requestEntity.setModelId(modelId);
         requestEntity.setRequestInfo(requestInfo);
-        requestRepository.save(requestEntity);
         return requestRepository.save(requestEntity);
     }
 
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java
index 3c3ae9f..30205de 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java
@@ -149,6 +149,7 @@ public class DorisClusterModuleManager {
             serviceCreateOperation(moduleEntity, serviceNamePorts, accessInfo);
         }
 
+        clusterModuleRepository.save(moduleEntity);
     }
 
     private void serviceCreateOperation(ClusterModuleEntity module, 
Map<String, Integer> serviceNamePorts,
@@ -252,6 +253,9 @@ public class DorisClusterModuleManager {
 
         // delete service
         serviceRepository.deleteByModuleId(module.getId());
+
+        // delete module
+        clusterModuleRepository.deleteById(module.getId());
     }
 
 }
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
index 9f4c91e..2d76ce9 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
@@ -46,6 +46,7 @@ import java.net.UnknownHostException;
 @Component
 public class ResourceNodeAndAgentManager {
     private static final String AGENT_START_SCRIPT = 
Constants.KEY_DORIS_AGENT_START_SCRIPT;
+    private static final String AGENT_CONFIG_PATH = 
Constants.KEY_DORIS_AGENT_CONFIG_PATH;
 
     @Autowired
     private ResourceNodeRepository nodeRepository;
@@ -184,6 +185,58 @@ public class ResourceNodeAndAgentManager {
         // agent start
         // AGENT_START stage
         String agentInstallHome = configInfo.getInstallDir() + File.separator 
+ "agent";
+
+        // 1 port check, eg: server.port=8008
+        // grep = application.properties | grep  -w server.port  | awk -F '=' 
'{print $2}'
+        String confFile = agentInstallHome + File.separator + 
AGENT_CONFIG_PATH;
+        String portGetFormat = "grep = %s | grep  -w server.port  | awk -F '=' 
'{print $2}'";
+        String portGetCmd = String.format(portGetFormat, confFile);
+
+        SSH portGetSSH = new SSH(configInfo.getSshUser(), 
configInfo.getSshPort(),
+                sshKeyFile.getAbsolutePath(), configInfo.getHost(), 
portGetCmd);
+
+        int agentPort = -1;
+        if (portGetSSH.run()) {
+            String portStr = portGetSSH.getStdoutResponse();
+            log.info("agent {} port get return output: {}", 
configInfo.getAgentNodeId(), portStr);
+
+            if (portStr == null || portStr.isEmpty()) {
+                log.warn("agent {} server.port is not set", 
configInfo.getAgentNodeId());
+            } else {
+                try {
+                    agentPort = Integer.parseInt(portStr.trim());
+                } catch (NumberFormatException e) {
+                    log.warn("agent port format is not Integer");
+                }
+            }
+
+        } else {
+            log.warn("run agent port get cmd failed:{}, skip the check and use 
default port",
+                    portGetSSH.getErrorResponse());
+        }
+
+        if (agentPort > 0) {
+            log.info("agent start port is {}", agentPort);
+            // only check listen port
+            String checkPortCmd = String.format("netstat -tunlp | grep  -w 
%s", agentPort);
+            SSH checkPortSSH = new SSH(configInfo.getSshUser(), 
configInfo.getSshPort(),
+                    sshKeyFile.getAbsolutePath(), configInfo.getHost(), 
checkPortCmd);
+            if (checkPortSSH.run()) {
+                String netInfo = checkPortSSH.getStdoutResponse();
+                log.info("agent {} port check return output: {}", 
configInfo.getAgentNodeId(), netInfo);
+
+                if (netInfo != null && !netInfo.trim().isEmpty()) {
+                    log.error("port {} already in use, {}", agentPort, 
netInfo);
+                    updateFailResult("port already in use",
+                            AgentInstallEventStage.AGENT_START.getStage(), 
agentInstallAgentEntity);
+                    return;
+                }
+            } else {
+                log.warn("run check port cmd failed");
+            }
+        }
+
+        // 2 run start shell
         String command = "cd %s && sh %s  --server %s --agent %s";
         String cmd = String.format(command, agentInstallHome, 
AGENT_START_SCRIPT, getServerAddr(), configInfo.getAgentNodeId());
         SSH startSsh = new SSH(configInfo.getSshUser(), 
configInfo.getSshPort(),
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java 
b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
index a07ef9a..bb18f19 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
@@ -32,10 +32,15 @@ import java.util.stream.Collectors;
 public abstract class BaseCommand {
 
     protected String[] resultCommand;
+    protected String stdoutResponse;
     protected String errorResponse;
 
     protected abstract void buildCommand();
 
+    public String getStdoutResponse() {
+        return this.stdoutResponse;
+    }
+
     public String getErrorResponse() {
         return this.errorResponse;
     }
@@ -45,11 +50,15 @@ public abstract class BaseCommand {
         log.info("run command: {}", StringUtils.join(resultCommand, " "));
         ProcessBuilder pb = new ProcessBuilder(resultCommand);
         Process process = null;
-        BufferedReader bufferedReader = null;
+        BufferedReader stdoutBufferedReader = null;
+        BufferedReader errorBufferedReader = null;
         try {
             process = pb.start();
-            bufferedReader = new BufferedReader(new 
InputStreamReader(process.getErrorStream()));
-            errorResponse = 
bufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
+            stdoutBufferedReader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
+            errorBufferedReader = new BufferedReader(new 
InputStreamReader(process.getErrorStream()));
+
+            stdoutResponse = 
stdoutBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
+            errorResponse = 
errorBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
             final int exitCode = process.waitFor();
             if (exitCode == 0) {
                 return true;
@@ -65,8 +74,11 @@ public abstract class BaseCommand {
                 process.destroy();
             }
             try {
-                if (bufferedReader != null) {
-                    bufferedReader.close();
+                if (stdoutBufferedReader != null) {
+                    stdoutBufferedReader.close();
+                }
+                if (errorBufferedReader != null) {
+                    errorBufferedReader.close();
                 }
             } catch (IOException e) {
                 log.error("close buffered reader fail");
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java 
b/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java
index 27e1bb4..896e1d0 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java
@@ -19,6 +19,7 @@ package org.apache.doris.stack.util;
 
 public class Constants {
     public static final String KEY_DORIS_AGENT_START_SCRIPT = 
"bin/agent_start.sh";
+    public static final String KEY_DORIS_AGENT_CONFIG_PATH = 
"config/application.properties";
     public static final String KEY_FE_QUERY_PORT = "query_port";
     public static final String KEY_FE_EDIT_LOG_PORT = "edit_log_port";
     public static final String KEY_BE_HEARTBEAT_PORT = 
"heartbeat_service_port";
diff --git a/manager/manager-server/src/main/resources/cache/ehcache.xml 
b/manager/manager-server/src/main/resources/cache/ehcache.xml
index 9ea40e3..8463f31 100644
--- a/manager/manager-server/src/main/resources/cache/ehcache.xml
+++ b/manager/manager-server/src/main/resources/cache/ehcache.xml
@@ -6,4 +6,29 @@
            timeToLiveSeconds="3600"
            overflowToDisk="false"
     />
+
+    <!-- ResourceNodeRepository cache -->
+    <cache name="node_agent"
+           maxElementsInMemory="1000"
+           eternal="false"
+           timeToIdleSeconds="1800"
+           timeToLiveSeconds="3600"
+           overflowToDisk="false"
+    />
+    <!-- HeartBeatEventRepository -->
+    <cache name="heart_beat"
+           maxElementsInMemory="1000"
+           eternal="false"
+           timeToIdleSeconds="1800"
+           timeToLiveSeconds="3600"
+           overflowToDisk="false"
+    />
+    <!-- ClusterInstanceRepository cache -->
+    <cache name="cluster_instance"
+           maxElementsInMemory="1000"
+           eternal="false"
+           timeToIdleSeconds="1800"
+           timeToLiveSeconds="3600"
+           overflowToDisk="false"
+    />
 </ehcache>
diff --git 
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
 
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
index 2dcb1fa..b77486e 100644
--- 
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
+++ 
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
@@ -18,6 +18,8 @@
 package org.apache.doris.stack.dao;
 
 import org.apache.doris.stack.entity.ClusterInstanceEntity;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.query.Param;
@@ -26,6 +28,7 @@ import java.util.List;
 
 public interface ClusterInstanceRepository extends 
JpaRepository<ClusterInstanceEntity, Long> {
     @Query("select c from ClusterInstanceEntity c where c.nodeId = :nodeId")
+    @Cacheable(value = "cluster_instance", key = "#p0")
     List<ClusterInstanceEntity> getByNodeId(@Param("nodeId") long nodeId);
 
     @Query("select c from ClusterInstanceEntity c where c.moduleId = 
:moduleId")
@@ -34,4 +37,15 @@ public interface ClusterInstanceRepository extends 
JpaRepository<ClusterInstance
     @Query("select c.nodeId from ClusterInstanceEntity c where c.moduleId = 
:moduleId")
     List<Long> getNodeIdsByModuleId(@Param("moduleId") long moduleId);
 
+    @Override
+    @CacheEvict(value = "cluster_instance", key = "#result.nodeId")
+    ClusterInstanceEntity save(ClusterInstanceEntity entity);
+
+    @Override
+    @CacheEvict(value = "cluster_instance", allEntries = true)
+    void deleteById(Long id);
+
+    @Override
+    @CacheEvict(value = "cluster_instance", key = "#entity.nodeId")
+    void delete(ClusterInstanceEntity entity);
 }
diff --git 
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/HeartBeatEventRepository.java
 
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/HeartBeatEventRepository.java
index 6ae686e..d6f2c27 100644
--- 
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/HeartBeatEventRepository.java
+++ 
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/HeartBeatEventRepository.java
@@ -21,8 +21,12 @@ import org.apache.doris.stack.entity.HeartBeatEventEntity;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.query.Param;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+import org.springframework.cache.annotation.CachePut;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 public interface HeartBeatEventRepository extends 
JpaRepository<HeartBeatEventEntity, Long> {
@@ -33,4 +37,20 @@ public interface HeartBeatEventRepository extends 
JpaRepository<HeartBeatEventEn
 
     @Query("select c.status from HeartBeatEventEntity c where c.requestId = 
:requestId")
     Set<String> getStatusByRequestId(@Param("requestId") long requestId);
+
+    @Override
+    @CachePut(value = "heart_beat", key = "#result.id")
+    HeartBeatEventEntity save(HeartBeatEventEntity entity);
+
+    @Override
+    @Cacheable(value = "heart_beat", key = "#p0")
+    Optional<HeartBeatEventEntity> findById(Long id);
+
+    @Override
+    @CacheEvict(value = "heart_beat", key = "#p0")
+    void deleteById(Long id);
+
+    @Override
+    @CacheEvict(value = "heat_beat", key = "#entity.id")
+    void delete(HeartBeatEventEntity entity);
 }
diff --git 
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ResourceNodeRepository.java
 
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ResourceNodeRepository.java
index e26a800..025a28d 100644
--- 
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ResourceNodeRepository.java
+++ 
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ResourceNodeRepository.java
@@ -18,12 +18,16 @@
 package org.apache.doris.stack.dao;
 
 import org.apache.doris.stack.entity.ResourceNodeEntity;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.CachePut;
+import org.springframework.cache.annotation.Cacheable;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.query.Param;
 
 import java.util.List;
+import java.util.Optional;
 
 public interface ResourceNodeRepository extends 
JpaRepository<ResourceNodeEntity, Long> {
     @Query("select c.host from ResourceNodeEntity c where c.resourceClusterId 
= :resourceClusterId")
@@ -34,6 +38,23 @@ public interface ResourceNodeRepository extends 
JpaRepository<ResourceNodeEntity
 
     @Modifying
     @Query("delete from ResourceNodeEntity c where c.resourceClusterId = 
:resourceClusterId and c.host = :host")
+    @CacheEvict(value = "node_agent", allEntries = true)
     void deleteByResourceClusterIdAndHost(@Param("resourceClusterId") long 
resourceClusterId,
                                           @Param("host") String host);
+
+    @Override
+    @CachePut(value = "node_agent", key = "#result.id")
+    ResourceNodeEntity save(ResourceNodeEntity entity);
+
+    @Override
+    @Cacheable(value = "node_agent", key = "#p0")
+    Optional<ResourceNodeEntity> findById(Long id);
+
+    @Override
+    @CacheEvict(value = "node_agent", key = "#p0")
+    void deleteById(Long id);
+
+    @Override
+    @CacheEvict(value = "node_agent", key = "#entity.id")
+    void delete(ResourceNodeEntity entity);
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to