This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-manager.git
The following commit(s) were added to refs/heads/master by this push: new 8ba0737 fix module delete and config bug && update agent code (#16) 8ba0737 is described below commit 8ba0737589967c8a3b2f5537adcfd3af73d75a5f Author: LiRui <1176867...@qq.com> AuthorDate: Thu Mar 24 15:37:29 2022 +0800 fix module delete and config bug && update agent code (#16) * add dm-agent resource includes * fix module delete/config bug * export agent config and start module at module root dir * check agent port before starting agent * add cache for heartbeat event Co-authored-by: lirui40 <liru...@baidu.com> --- build.sh | 4 ++ .../service/heartbeat/DorisInstanceOperator.java | 10 ++-- .../component/ModelControlRequestComponent.java | 1 - .../control/manager/DorisClusterModuleManager.java | 4 ++ .../manager/ResourceNodeAndAgentManager.java | 53 ++++++++++++++++++++++ .../org/apache/doris/stack/shell/BaseCommand.java | 22 +++++++-- .../org/apache/doris/stack/util/Constants.java | 1 + .../src/main/resources/cache/ehcache.xml | 25 ++++++++++ .../doris/stack/dao/ClusterInstanceRepository.java | 14 ++++++ .../doris/stack/dao/HeartBeatEventRepository.java | 20 ++++++++ .../doris/stack/dao/ResourceNodeRepository.java | 21 +++++++++ 11 files changed, 166 insertions(+), 9 deletions(-) diff --git a/build.sh b/build.sh index 75ad587..4dfb601 100644 --- a/build.sh +++ b/build.sh @@ -50,6 +50,10 @@ mv output/manager-bin/agent output/ mv output/manager-bin output/server/bin mkdir -p output/agent/lib mv manager/dm-agent/target/dm-agent-1.0.0.jar output/agent/lib/dm-agent.jar + +mkdir -p output/agent/config +cp manager/dm-agent/src/main/resources/application.properties output/agent/config + cp -r manager/manager-server/src/main/resources/web-resource output/server/ tar -zcvf doris-manager-1.0.0.tar.gz output/ echo "copy to output package end" \ No newline at end of file diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java index c9195b5..961f4a4 100644 --- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java +++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java @@ -306,9 +306,13 @@ public class DorisInstanceOperator { private void executePkgShellScriptWithBash(String scriptName, String runningDir, String moduleName, Map<String, String> environment) throws Exception { - String scripts = Paths.get(runningDir, moduleName, "bin", scriptName).toFile().getAbsolutePath(); - final String shellCmd = "sh " + scripts; - log.info("begin to execute: `" + shellCmd + "`"); + String mouduleRootDir = runningDir + File.separator + moduleName; + String script = "bin" + File.separator + scriptName; + + String cmdFormat = "cd %s && sh %s"; + final String shellCmd = String.format(cmdFormat, mouduleRootDir, script); + + log.info("begin to execute with bash: `" + shellCmd + "`"); ShellUtil.cmdExecute(shellCmd); } diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/component/ModelControlRequestComponent.java b/manager/dm-server/src/main/java/org/apache/doris/stack/component/ModelControlRequestComponent.java index e49e71e..8c72032 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/component/ModelControlRequestComponent.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/component/ModelControlRequestComponent.java @@ -51,7 +51,6 @@ public class ModelControlRequestComponent { requestEntity.setCurrentEventType(eventType); requestEntity.setModelId(modelId); requestEntity.setRequestInfo(requestInfo); - requestRepository.save(requestEntity); return requestRepository.save(requestEntity); } diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java index 3c3ae9f..30205de 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java @@ -149,6 +149,7 @@ public class DorisClusterModuleManager { serviceCreateOperation(moduleEntity, serviceNamePorts, accessInfo); } + clusterModuleRepository.save(moduleEntity); } private void serviceCreateOperation(ClusterModuleEntity module, Map<String, Integer> serviceNamePorts, @@ -252,6 +253,9 @@ public class DorisClusterModuleManager { // delete service serviceRepository.deleteByModuleId(module.getId()); + + // delete module + clusterModuleRepository.deleteById(module.getId()); } } diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java index 9f4c91e..2d76ce9 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java @@ -46,6 +46,7 @@ import java.net.UnknownHostException; @Component public class ResourceNodeAndAgentManager { private static final String AGENT_START_SCRIPT = Constants.KEY_DORIS_AGENT_START_SCRIPT; + private static final String AGENT_CONFIG_PATH = Constants.KEY_DORIS_AGENT_CONFIG_PATH; @Autowired private ResourceNodeRepository nodeRepository; @@ -184,6 +185,58 @@ public class ResourceNodeAndAgentManager { // agent start // AGENT_START stage String agentInstallHome = configInfo.getInstallDir() + File.separator + "agent"; + + // 1 port check, eg: server.port=8008 + // grep = application.properties | grep -w server.port | awk -F '=' '{print $2}' + String confFile = agentInstallHome + File.separator + AGENT_CONFIG_PATH; + String portGetFormat = "grep = %s | grep -w server.port | awk -F '=' '{print $2}'"; + String portGetCmd = String.format(portGetFormat, confFile); + + SSH portGetSSH = new SSH(configInfo.getSshUser(), configInfo.getSshPort(), + sshKeyFile.getAbsolutePath(), configInfo.getHost(), portGetCmd); + + int agentPort = -1; + if (portGetSSH.run()) { + String portStr = portGetSSH.getStdoutResponse(); + log.info("agent {} port get return output: {}", configInfo.getAgentNodeId(), portStr); + + if (portStr == null || portStr.isEmpty()) { + log.warn("agent {} server.port is not set", configInfo.getAgentNodeId()); + } else { + try { + agentPort = Integer.parseInt(portStr.trim()); + } catch (NumberFormatException e) { + log.warn("agent port format is not Integer"); + } + } + + } else { + log.warn("run agent port get cmd failed:{}, skip the check and use default port", + portGetSSH.getErrorResponse()); + } + + if (agentPort > 0) { + log.info("agent start port is {}", agentPort); + // only check listen port + String checkPortCmd = String.format("netstat -tunlp | grep -w %s", agentPort); + SSH checkPortSSH = new SSH(configInfo.getSshUser(), configInfo.getSshPort(), + sshKeyFile.getAbsolutePath(), configInfo.getHost(), checkPortCmd); + if (checkPortSSH.run()) { + String netInfo = checkPortSSH.getStdoutResponse(); + log.info("agent {} port check return output: {}", configInfo.getAgentNodeId(), netInfo); + + if (netInfo != null && !netInfo.trim().isEmpty()) { + log.error("port {} already in use, {}", agentPort, netInfo); + updateFailResult("port already in use", + AgentInstallEventStage.AGENT_START.getStage(), agentInstallAgentEntity); + return; + } + } else { + log.warn("run check port cmd failed"); + } + } + + // 2 run start shell String command = "cd %s && sh %s --server %s --agent %s"; String cmd = String.format(command, agentInstallHome, AGENT_START_SCRIPT, getServerAddr(), configInfo.getAgentNodeId()); SSH startSsh = new SSH(configInfo.getSshUser(), configInfo.getSshPort(), diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java index a07ef9a..bb18f19 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java @@ -32,10 +32,15 @@ import java.util.stream.Collectors; public abstract class BaseCommand { protected String[] resultCommand; + protected String stdoutResponse; protected String errorResponse; protected abstract void buildCommand(); + public String getStdoutResponse() { + return this.stdoutResponse; + } + public String getErrorResponse() { return this.errorResponse; } @@ -45,11 +50,15 @@ public abstract class BaseCommand { log.info("run command: {}", StringUtils.join(resultCommand, " ")); ProcessBuilder pb = new ProcessBuilder(resultCommand); Process process = null; - BufferedReader bufferedReader = null; + BufferedReader stdoutBufferedReader = null; + BufferedReader errorBufferedReader = null; try { process = pb.start(); - bufferedReader = new BufferedReader(new InputStreamReader(process.getErrorStream())); - errorResponse = bufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator())); + stdoutBufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream())); + errorBufferedReader = new BufferedReader(new InputStreamReader(process.getErrorStream())); + + stdoutResponse = stdoutBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator())); + errorResponse = errorBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator())); final int exitCode = process.waitFor(); if (exitCode == 0) { return true; @@ -65,8 +74,11 @@ public abstract class BaseCommand { process.destroy(); } try { - if (bufferedReader != null) { - bufferedReader.close(); + if (stdoutBufferedReader != null) { + stdoutBufferedReader.close(); + } + if (errorBufferedReader != null) { + errorBufferedReader.close(); } } catch (IOException e) { log.error("close buffered reader fail"); diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java b/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java index 27e1bb4..896e1d0 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java @@ -19,6 +19,7 @@ package org.apache.doris.stack.util; public class Constants { public static final String KEY_DORIS_AGENT_START_SCRIPT = "bin/agent_start.sh"; + public static final String KEY_DORIS_AGENT_CONFIG_PATH = "config/application.properties"; public static final String KEY_FE_QUERY_PORT = "query_port"; public static final String KEY_FE_EDIT_LOG_PORT = "edit_log_port"; public static final String KEY_BE_HEARTBEAT_PORT = "heartbeat_service_port"; diff --git a/manager/manager-server/src/main/resources/cache/ehcache.xml b/manager/manager-server/src/main/resources/cache/ehcache.xml index 9ea40e3..8463f31 100644 --- a/manager/manager-server/src/main/resources/cache/ehcache.xml +++ b/manager/manager-server/src/main/resources/cache/ehcache.xml @@ -6,4 +6,29 @@ timeToLiveSeconds="3600" overflowToDisk="false" /> + + <!-- ResourceNodeRepository cache --> + <cache name="node_agent" + maxElementsInMemory="1000" + eternal="false" + timeToIdleSeconds="1800" + timeToLiveSeconds="3600" + overflowToDisk="false" + /> + <!-- HeartBeatEventRepository --> + <cache name="heart_beat" + maxElementsInMemory="1000" + eternal="false" + timeToIdleSeconds="1800" + timeToLiveSeconds="3600" + overflowToDisk="false" + /> + <!-- ClusterInstanceRepository cache --> + <cache name="cluster_instance" + maxElementsInMemory="1000" + eternal="false" + timeToIdleSeconds="1800" + timeToLiveSeconds="3600" + overflowToDisk="false" + /> </ehcache> diff --git a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java index 2dcb1fa..b77486e 100644 --- a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java +++ b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java @@ -18,6 +18,8 @@ package org.apache.doris.stack.dao; import org.apache.doris.stack.entity.ClusterInstanceEntity; +import org.springframework.cache.annotation.CacheEvict; +import org.springframework.cache.annotation.Cacheable; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; @@ -26,6 +28,7 @@ import java.util.List; public interface ClusterInstanceRepository extends JpaRepository<ClusterInstanceEntity, Long> { @Query("select c from ClusterInstanceEntity c where c.nodeId = :nodeId") + @Cacheable(value = "cluster_instance", key = "#p0") List<ClusterInstanceEntity> getByNodeId(@Param("nodeId") long nodeId); @Query("select c from ClusterInstanceEntity c where c.moduleId = :moduleId") @@ -34,4 +37,15 @@ public interface ClusterInstanceRepository extends JpaRepository<ClusterInstance @Query("select c.nodeId from ClusterInstanceEntity c where c.moduleId = :moduleId") List<Long> getNodeIdsByModuleId(@Param("moduleId") long moduleId); + @Override + @CacheEvict(value = "cluster_instance", key = "#result.nodeId") + ClusterInstanceEntity save(ClusterInstanceEntity entity); + + @Override + @CacheEvict(value = "cluster_instance", allEntries = true) + void deleteById(Long id); + + @Override + @CacheEvict(value = "cluster_instance", key = "#entity.nodeId") + void delete(ClusterInstanceEntity entity); } diff --git a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/HeartBeatEventRepository.java b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/HeartBeatEventRepository.java index 6ae686e..d6f2c27 100644 --- a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/HeartBeatEventRepository.java +++ b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/HeartBeatEventRepository.java @@ -21,8 +21,12 @@ import org.apache.doris.stack.entity.HeartBeatEventEntity; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; +import org.springframework.cache.annotation.CacheEvict; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.cache.annotation.CachePut; import java.util.List; +import java.util.Optional; import java.util.Set; public interface HeartBeatEventRepository extends JpaRepository<HeartBeatEventEntity, Long> { @@ -33,4 +37,20 @@ public interface HeartBeatEventRepository extends JpaRepository<HeartBeatEventEn @Query("select c.status from HeartBeatEventEntity c where c.requestId = :requestId") Set<String> getStatusByRequestId(@Param("requestId") long requestId); + + @Override + @CachePut(value = "heart_beat", key = "#result.id") + HeartBeatEventEntity save(HeartBeatEventEntity entity); + + @Override + @Cacheable(value = "heart_beat", key = "#p0") + Optional<HeartBeatEventEntity> findById(Long id); + + @Override + @CacheEvict(value = "heart_beat", key = "#p0") + void deleteById(Long id); + + @Override + @CacheEvict(value = "heat_beat", key = "#entity.id") + void delete(HeartBeatEventEntity entity); } diff --git a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ResourceNodeRepository.java b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ResourceNodeRepository.java index e26a800..025a28d 100644 --- a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ResourceNodeRepository.java +++ b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ResourceNodeRepository.java @@ -18,12 +18,16 @@ package org.apache.doris.stack.dao; import org.apache.doris.stack.entity.ResourceNodeEntity; +import org.springframework.cache.annotation.CacheEvict; +import org.springframework.cache.annotation.CachePut; +import org.springframework.cache.annotation.Cacheable; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; import java.util.List; +import java.util.Optional; public interface ResourceNodeRepository extends JpaRepository<ResourceNodeEntity, Long> { @Query("select c.host from ResourceNodeEntity c where c.resourceClusterId = :resourceClusterId") @@ -34,6 +38,23 @@ public interface ResourceNodeRepository extends JpaRepository<ResourceNodeEntity @Modifying @Query("delete from ResourceNodeEntity c where c.resourceClusterId = :resourceClusterId and c.host = :host") + @CacheEvict(value = "node_agent", allEntries = true) void deleteByResourceClusterIdAndHost(@Param("resourceClusterId") long resourceClusterId, @Param("host") String host); + + @Override + @CachePut(value = "node_agent", key = "#result.id") + ResourceNodeEntity save(ResourceNodeEntity entity); + + @Override + @Cacheable(value = "node_agent", key = "#p0") + Optional<ResourceNodeEntity> findById(Long id); + + @Override + @CacheEvict(value = "node_agent", key = "#p0") + void deleteById(Long id); + + @Override + @CacheEvict(value = "node_agent", key = "#entity.id") + void delete(ResourceNodeEntity entity); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org