This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch doris-manager
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/doris-manager by this push:
     new 4a22750  [improvement](manager) Modify the agent startup service and 
build cluster logic (#7705)
4a22750 is described below

commit 4a22750360ab72d55d5506596ab5e0977d2e11c9
Author: wudi <676366...@qq.com>
AuthorDate: Sun Jan 16 10:39:58 2022 +0800

    [improvement](manager) Modify the agent startup service and build cluster 
logic (#7705)
    
    1. The agent judges that the fe/be/broker has been successfully started, 
The logic is modified to judge whether the process still exists after 10 
seconds of starting.
    2. The submitted state is uniformly changed to running at the front end
    3. change [build cluster] the logic unified as add fe, add be, add broker
---
 manager/README.md                                  |  13 +-
 .../agent/command/CommandResultService.java        |  33 ++--
 .../doris/manager/agent/common/AgentConstants.java |   8 +-
 .../doris/manager/agent/service/BeService.java     |   5 +
 .../doris/manager/agent/service/BrokerService.java |   5 +
 .../doris/manager/agent/service/FeService.java     |   5 +
 .../doris/manager/agent/service/Service.java       |  25 ++++
 .../apache/doris/stack/constants/Constants.java    |   2 -
 .../doris/stack/controller/AgentController.java    |  14 +-
 .../doris/stack/dao/TaskInstanceRepository.java    |   2 +-
 .../{BeJoinReq.java => BuildClusterReq.java}       |   6 +-
 .../apache/doris/stack/service/AgentProcess.java   |   4 +-
 .../doris/stack/service/impl/AgentProcessImpl.java | 166 ++++++++++++---------
 .../doris/stack/service/impl/ProcessTaskImpl.java  |   3 +
 .../java/org/apache/doris/stack/util/JdbcUtil.java |   1 -
 ...216\245\345\217\243\346\226\207\346\241\243.md" |  12 +-
 manager/manager-bin/agent/bin/process_exist.sh     |  28 ++++
 17 files changed, 207 insertions(+), 125 deletions(-)

diff --git a/manager/README.md b/manager/README.md
index a0b742a..713fee6 100644
--- a/manager/README.md
+++ b/manager/README.md
@@ -36,6 +36,8 @@ agent/, doris manger agent
         agent_stop.sh, doris manger agent stop script
         install_be.sh, doris be install script
         install_fe.sh, doris fe install script
+        install_broker.sh, doris broker install script
+        process_exist.sh, doris process detection script
     lib/
         dm-agent.jar, executable package of doris manger agent
 server/, doris manger server
@@ -98,13 +100,4 @@ Browser access ${serverIp}:8080, Manger server has a preset 
super administrator
 user name: Admin
 password: Admin@123
 (Case sensitive)
-```
-
-### Step3: install agent service
-After decompressing the tar package in the second step, there will be a 
directory called agent. copy the agent directory to the
-machine where the agent service is installed,
-```
-$ sh bin/agent_start.sh --agent ${agentIp} --server ${serverIp}:8080
-```
-${agentIp} is the IP of the machine where the agent is located, ${serverIp} is 
the IP of the machine where the manager server is located
-
+```
\ No newline at end of file
diff --git 
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/command/CommandResultService.java
 
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/command/CommandResultService.java
index 12d00f2..5c175dc 100644
--- 
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/command/CommandResultService.java
+++ 
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/command/CommandResultService.java
@@ -80,33 +80,33 @@ public class CommandResultService {
             return new CommandResult(tmpTaskResult);
         }
 
+        if (Objects.isNull(task.getTaskResult().getStartTime())
+                || task.getTaskResult().getStartTime().getTime() + 
AgentConstants.COMMAND_EXECUTE_DETECT_DURATION_MILSECOND > 
System.currentTimeMillis()) {
+            tmpTaskResult.setTaskState(TaskState.RUNNING);
+            
tmpTaskResult.setRetCode(AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE);
+            return new CommandResult(tmpTaskResult);
+        }
+
         int retCode = AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE;
         Boolean health = false;
-        TaskState taskState = TaskState.RUNNING;
         if (CommandType.START_FE == commandType) {
-            health = 
ServiceContext.getServiceMap().get(ServiceRole.FE).isHealth();
+            health = 
ServiceContext.getServiceMap().get(ServiceRole.FE).processExist();
             retCode = health ? AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE : 
AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE;
-            taskState = health ? TaskState.FINISHED : TaskState.RUNNING;
         } else if (CommandType.STOP_FE == commandType) {
-            health = 
ServiceContext.getServiceMap().get(ServiceRole.FE).isHealth();
+            health = 
ServiceContext.getServiceMap().get(ServiceRole.FE).processExist();
             retCode = !health ? AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE : 
AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE;
-            taskState = !health ? TaskState.FINISHED : TaskState.RUNNING;
         } else if (CommandType.START_BE == commandType) {
-            health = 
ServiceContext.getServiceMap().get(ServiceRole.BE).isHealth();
+            health = 
ServiceContext.getServiceMap().get(ServiceRole.BE).processExist();
             retCode = health ? AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE : 
AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE;
-            taskState = health ? TaskState.FINISHED : TaskState.RUNNING;
         } else if (CommandType.STOP_BE == commandType) {
-            health = 
ServiceContext.getServiceMap().get(ServiceRole.BE).isHealth();
+            health = 
ServiceContext.getServiceMap().get(ServiceRole.BE).processExist();
             retCode = !health ? AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE : 
AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE;
-            taskState = !health ? TaskState.FINISHED : TaskState.RUNNING;
         } else if (CommandType.START_BROKER == commandType) {
-            health = 
ServiceContext.getServiceMap().get(ServiceRole.BROKER).isHealth();
+            health = 
ServiceContext.getServiceMap().get(ServiceRole.BROKER).processExist();
             retCode = health ? AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE : 
AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE;
-            taskState = health ? TaskState.FINISHED : TaskState.RUNNING;
         } else if (CommandType.STOP_BROKER == commandType) {
-            health = 
ServiceContext.getServiceMap().get(ServiceRole.BROKER).isHealth();
+            health = 
ServiceContext.getServiceMap().get(ServiceRole.BROKER).processExist();
             retCode = !health ? AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE : 
AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE;
-            taskState = !health ? TaskState.FINISHED : TaskState.RUNNING;
         }
 
         if (health && (CommandType.START_FE == commandType || 
CommandType.START_BE == commandType || CommandType.START_BROKER == 
commandType)) {
@@ -116,13 +116,8 @@ public class CommandResultService {
             taskFinalStatus.put(task.getTaskId(), 
AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE);
         }
 
-        if (task.getTaskResult().getSubmitTime().getTime() + 
AgentConstants.COMMAND_EXECUTE_TIMEOUT_MILSECOND < System.currentTimeMillis()) {
-            taskFinalStatus.put(task.getTaskId(), 
AgentConstants.COMMAND_EXECUTE_TIMEOUT_CODE);
-            tmpTaskResult.setTaskState(TaskState.FINISHED);
-        }
-
         tmpTaskResult.setRetCode(retCode);
-        tmpTaskResult.setTaskState(taskState);
+        tmpTaskResult.setTaskState(TaskState.FINISHED);
         return new CommandResult(tmpTaskResult);
     }
 }
diff --git 
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/common/AgentConstants.java
 
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/common/AgentConstants.java
index 7e549a7..e4756a9 100644
--- 
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/common/AgentConstants.java
+++ 
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/common/AgentConstants.java
@@ -20,8 +20,7 @@ package org.apache.doris.manager.agent.common;
 public class AgentConstants {
     public static final int COMMAND_EXECUTE_SUCCESS_CODE = 0;
     public static final int COMMAND_EXECUTE_UNHEALTH_CODE = -10;
-    public static final int COMMAND_EXECUTE_TIMEOUT_CODE = -11;
-    public static final int COMMAND_EXECUTE_TIMEOUT_MILSECOND = 180000;
+    public static final int COMMAND_EXECUTE_DETECT_DURATION_MILSECOND = 10000;
 
     public static final int COMMAND_HISTORY_SAVE_MAX_COUNT = 100;
 
@@ -44,4 +43,9 @@ public class AgentConstants {
     public static final int TASK_ERROR_CODE_EXCEPTION = -502;
 
     public static final String LOG_TYPE_TASK = "TASK_LOG";
+
+    public static final String PROCESS_KEYWORD_FE = "org.apache.doris.PaloFe";
+    public static final String PROCESS_KEYWORD_BE = "palo_be";
+    public static final String PROCESS_KEYWORD_BROKER = 
"org.apache.doris.broker.hdfs.BrokerBootstrap";
+
 }
diff --git 
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BeService.java
 
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BeService.java
index 3c749d4..b4efa43 100644
--- 
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BeService.java
+++ 
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BeService.java
@@ -138,4 +138,9 @@ public class BeService extends Service {
             }
         }
     }
+
+    @Override
+    public String serviceProcessKeyword() {
+        return AgentConstants.PROCESS_KEYWORD_BE;
+    }
 }
diff --git 
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BrokerService.java
 
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BrokerService.java
index f9a80ff..cf1b356 100644
--- 
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BrokerService.java
+++ 
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BrokerService.java
@@ -62,4 +62,9 @@ public class BrokerService extends Service {
         }
         return true;
     }
+
+    @Override
+    public String serviceProcessKeyword() {
+        return AgentConstants.PROCESS_KEYWORD_BROKER;
+    }
 }
diff --git 
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/FeService.java
 
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/FeService.java
index f4449e6..0a03f5a 100644
--- 
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/FeService.java
+++ 
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/FeService.java
@@ -115,4 +115,9 @@ public class FeService extends Service {
             }
         }
     }
+
+    @Override
+    public String serviceProcessKeyword() {
+        return AgentConstants.PROCESS_KEYWORD_FE;
+    }
 }
diff --git 
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/Service.java
 
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/Service.java
index 5cba549..e8e9e87 100644
--- 
a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/Service.java
+++ 
b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/Service.java
@@ -17,7 +17,11 @@
 
 package org.apache.doris.manager.agent.service;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.manager.agent.common.AgentConstants;
 import org.apache.doris.manager.agent.exception.AgentException;
+import org.apache.doris.manager.agent.register.AgentContext;
 import org.apache.doris.manager.common.domain.ServiceRole;
 
 import java.io.File;
@@ -25,6 +29,7 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.util.Properties;
 
+@Slf4j
 public abstract class Service {
     protected ServiceRole serviceRole = null;
     protected String installDir = null;
@@ -64,6 +69,26 @@ public abstract class Service {
 
     public abstract boolean isHealth();
 
+    public abstract String serviceProcessKeyword();
+
+    public boolean processExist() {
+        String comm = AgentConstants.BASH_BIN + 
AgentContext.getAgentInstallDir() + "/bin/process_exist.sh " + 
serviceProcessKeyword();
+        String[] commands = {"/bin/bash", "-c", comm};
+
+        int exitVal = AgentConstants.TASK_ERROR_CODE_DEFAULT;
+        try {
+            log.info("execute command:{}", StringUtils.join(commands, " "));
+            Process proc = Runtime.getRuntime().exec(commands);
+            exitVal = proc.waitFor();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        log.info("execute exit code:{}", exitVal);
+        return exitVal == AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE;
+    }
+
     public void load() {
         doLoad();
     }
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/constants/Constants.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/constants/Constants.java
index 81703d4..c98ebb9 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/constants/Constants.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/constants/Constants.java
@@ -31,6 +31,4 @@ public class Constants {
     public static final Integer DORIS_DEFAULT_FE_EDIT_LOG_PORT = 9010;
     public static final Integer DORIS_DEFAULT_BROKER_IPC_PORT = 8000;
 
-    public static final String BE_EXIST_MSG = "Same backend already exists";
-
 }
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/controller/AgentController.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/controller/AgentController.java
index 12727a0..b296b53 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/controller/AgentController.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/controller/AgentController.java
@@ -21,7 +21,7 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import org.apache.doris.manager.common.domain.AgentRoleRegister;
 import org.apache.doris.manager.common.domain.RResult;
-import org.apache.doris.stack.model.request.BeJoinReq;
+import org.apache.doris.stack.model.request.BuildClusterReq;
 import org.apache.doris.stack.model.request.DeployConfigReq;
 import org.apache.doris.stack.model.request.DorisExecReq;
 import org.apache.doris.stack.model.request.DorisInstallReq;
@@ -81,13 +81,13 @@ public class AgentController {
     }
 
     /**
-     * join be to cluster
+     * build cluster add backend,add fe,add broker
      */
-    @ApiOperation(value = "join be to cluster")
-    @RequestMapping(value = "/joinBe", method = RequestMethod.POST)
-    public RResult joinBe(HttpServletRequest request, HttpServletResponse 
response,
-                          @RequestBody BeJoinReq beJoinReq) throws Exception {
-        agentProcess.joinBe(request, response, beJoinReq);
+    @ApiOperation(value = "build cluster")
+    @RequestMapping(value = "/buildCluster", method = RequestMethod.POST)
+    public RResult buildCluster(HttpServletRequest request, 
HttpServletResponse response,
+                          @RequestBody BuildClusterReq buildClusterReq) throws 
Exception {
+        agentProcess.buildCluster(request, response, buildClusterReq);
         return RResult.success();
     }
 
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/dao/TaskInstanceRepository.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/dao/TaskInstanceRepository.java
index 9a5e153..63b4697 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/dao/TaskInstanceRepository.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/dao/TaskInstanceRepository.java
@@ -37,6 +37,6 @@ public interface TaskInstanceRepository extends 
JpaRepository<TaskInstanceEntity
     @Query("select f from TaskInstanceEntity f where f.processId = :processId 
and host = :host and processType = :processType and taskType = :taskType")
     TaskInstanceEntity queryTask(@Param("processId") int processId, 
@Param("host") String host, @Param("processType") ProcessTypeEnum processType, 
@Param("taskType") TaskTypeEnum taskType);
 
-    @Query("select f from TaskInstanceEntity f where f.processId = :processId 
and taskType = :taskType")
+    @Query("select f from TaskInstanceEntity f where f.processId = :processId 
and taskType = :taskType order by f.id asc")
     List<TaskInstanceEntity> queryTasks(@Param("processId") int processId, 
@Param("taskType") TaskTypeEnum taskType);
 }
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/BeJoinReq.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/BuildClusterReq.java
similarity index 91%
rename from 
manager/dm-server/src/main/java/org/apache/doris/stack/model/request/BeJoinReq.java
rename to 
manager/dm-server/src/main/java/org/apache/doris/stack/model/request/BuildClusterReq.java
index 299248d..8de0939 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/BeJoinReq.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/BuildClusterReq.java
@@ -29,9 +29,11 @@ import java.util.List;
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
-public class BeJoinReq {
+public class BuildClusterReq {
 
     private int processId;
 
-    private List<String> hosts;
+    private List<String> feHosts;
+
+    private List<String> beHosts;
 }
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/service/AgentProcess.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/service/AgentProcess.java
index 0695337..4002883 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/service/AgentProcess.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/service/AgentProcess.java
@@ -19,7 +19,7 @@ package org.apache.doris.stack.service;
 
 import org.apache.doris.manager.common.domain.AgentRoleRegister;
 import org.apache.doris.manager.common.domain.HardwareInfo;
-import org.apache.doris.stack.model.request.BeJoinReq;
+import org.apache.doris.stack.model.request.BuildClusterReq;
 import org.apache.doris.stack.model.request.DeployConfigReq;
 import org.apache.doris.stack.model.request.DorisExecReq;
 import org.apache.doris.stack.model.request.DorisInstallReq;
@@ -49,7 +49,7 @@ public interface AgentProcess {
      */
     void startService(HttpServletRequest request, HttpServletResponse 
response, DorisStartReq dorisStart) throws Exception;
 
-    void joinBe(HttpServletRequest request, HttpServletResponse response, 
BeJoinReq beJoinReq) throws Exception;
+    void buildCluster(HttpServletRequest request, HttpServletResponse 
response, BuildClusterReq buildClusterReq) throws Exception;
 
     boolean register(AgentRoleRegister agentReg);
 
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/AgentProcessImpl.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/AgentProcessImpl.java
index bcd7997..3457ea4 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/AgentProcessImpl.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/AgentProcessImpl.java
@@ -58,7 +58,7 @@ import org.apache.doris.stack.entity.ProcessInstanceEntity;
 import org.apache.doris.stack.entity.TaskInstanceEntity;
 import org.apache.doris.stack.exceptions.JdbcException;
 import org.apache.doris.stack.exceptions.ServerException;
-import org.apache.doris.stack.model.request.BeJoinReq;
+import org.apache.doris.stack.model.request.BuildClusterReq;
 import org.apache.doris.stack.model.request.DeployConfigReq;
 import org.apache.doris.stack.model.request.DorisExecReq;
 import org.apache.doris.stack.model.request.DorisInstallReq;
@@ -366,8 +366,6 @@ public class AgentProcessImpl implements AgentProcess {
                         public void run() {
                             try {
                                 String leaderFeHost = 
waitMasterFeStart(process.getId());
-                                Integer leaderFeQueryPort = 
getFeQueryPort(leaderFeHost, agentCache.agentPort(leaderFeHost));
-                                joinFe(leaderFeHost, leaderFeQueryPort, 
start.getHost());
                                 CommandRequest creq = 
buildStartCmd(ServiceRole.FE, leaderFeHost);
                                 handleAgentTask(execTask, creq);
                             } catch (Exception e) {
@@ -383,9 +381,6 @@ public class AgentProcessImpl implements AgentProcess {
                     @Override
                     public void run() {
                         try {
-                            String aliveFeHost = 
waitMasterFeStart(process.getId());
-                            Integer aliveFePort = getFeQueryPort(aliveFeHost, 
agentCache.agentPort(aliveFeHost));
-                            joinBroker(aliveFeHost, aliveFePort, 
execTask.getHost());
                             CommandRequest creq = 
buildStartCmd(ServiceRole.BROKER, null);
                             handleAgentTask(execTask, creq);
                         } catch (Exception e) {
@@ -468,45 +463,6 @@ public class AgentProcessImpl implements AgentProcess {
     }
 
     /**
-     * add broker to cluster
-     */
-    private void joinBroker(String aliveFeHost, int aliveFePort, String 
addBrokerHost) {
-        Integer brokerIpcPort = getBrokerIpcPort(addBrokerHost, 
agentCache.agentPort(addBrokerHost));
-        Connection connection = null;
-        try {
-            connection = JdbcUtil.getConnection(aliveFeHost, aliveFePort);
-            String sql = "ALTER SYSTEM ADD BROKER broker_name \"%s:%s\"";
-            String joinBrokerSql = String.format(sql, addBrokerHost, 
brokerIpcPort);
-            JdbcUtil.execute(connection, joinBrokerSql);
-            log.info("execute {}", joinBrokerSql);
-        } catch (SQLException ex) {
-            log.error("add broker to cluster failed:{}", addBrokerHost, ex);
-        } finally {
-            JdbcUtil.closeConn(connection);
-        }
-    }
-
-    /**
-     * add fe to cluster
-     */
-    private void joinFe(String leaderHost, int leaderQueryPort, String 
addFeHost) {
-        AgentRoleEntity agentRole = 
agentRoleComponent.queryByHostRole(addFeHost, ServiceRole.FE.name());
-        Integer addFePort = getFeEditLogPort(addFeHost, 
agentCache.agentPort(addFeHost));
-        Connection connection = null;
-        try {
-            connection = JdbcUtil.getConnection(leaderHost, leaderQueryPort);
-            String sql = "ALTER SYSTEM ADD %s \"%s:%s\"";
-            String joinFeSql = String.format(sql, 
agentRole.getFeNodeType().toUpperCase(), addFeHost, addFePort);
-            JdbcUtil.execute(connection, joinFeSql);
-            log.info("execute {}", joinFeSql);
-        } catch (SQLException ex) {
-            log.error("add fe to cluster failed:{}", addFeHost, ex);
-        } finally {
-            JdbcUtil.closeConn(connection);
-        }
-    }
-
-    /**
      * get fe http port
      **/
     public Integer getFeQueryPort(String host, Integer port) {
@@ -549,19 +505,14 @@ public class AgentProcessImpl implements AgentProcess {
     }
 
     /**
-     * get alive agent
+     * get fe leader agent,the first start fe task is leader fe
      */
-    public AgentEntity getAliveAgent(int cluserId) {
-        List<AgentRoleEntity> agentRoleEntities = 
agentRoleComponent.queryAgentByRole(ServiceRole.FE.name(), cluserId);
-        AgentEntity aliveAgent = null;
-        for (AgentRoleEntity agentRole : agentRoleEntities) {
-            aliveAgent = agentCache.agentInfo(agentRole.getHost());
-            if (AgentStatus.RUNNING.equals(aliveAgent.getStatus())) {
-                break;
-            }
-        }
-        Preconditions.checkNotNull(aliveAgent, "no agent alive");
-        return aliveAgent;
+    public AgentEntity getFeLeaderAgent(ProcessInstanceEntity process) {
+        List<TaskInstanceEntity> taskEntities = 
taskInstanceRepository.queryTasks(process.getId(), TaskTypeEnum.START_FE);
+        Preconditions.checkArgument(ObjectUtils.isNotEmpty(taskEntities), 
"Failed find fe leader");
+        String feLeader = taskEntities.get(0).getHost();
+        AgentEntity feLeaderAgent = agentCache.agentInfo(feLeader);
+        return feLeaderAgent;
     }
 
     /**
@@ -575,28 +526,73 @@ public class AgentProcessImpl implements AgentProcess {
     }
 
     @Override
-    public void joinBe(HttpServletRequest request, HttpServletResponse 
response, BeJoinReq beJoinReq) throws Exception {
-        
Preconditions.checkArgument(ObjectUtils.isNotEmpty(beJoinReq.getHosts()), "not 
find backends");
+    public void buildCluster(HttpServletRequest request, HttpServletResponse 
response, BuildClusterReq buildClusterReq) throws Exception {
+        List<String> feHosts = buildClusterReq.getFeHosts();
+        List<String> beHosts = buildClusterReq.getBeHosts();
+        Preconditions.checkArgument(ObjectUtils.isNotEmpty(feHosts), "not find 
Frontend");
+        Preconditions.checkArgument(ObjectUtils.isNotEmpty(beHosts), "not find 
Backend");
+
         int userId = authenticationService.checkAllUserAuthWithCookie(request, 
response);
-        processInstanceComponent.checkProcessFinish(beJoinReq.getProcessId());
-        processInstanceComponent.checkHasUnfinishProcess(userId, 
beJoinReq.getProcessId());
-        boolean success = 
taskInstanceComponent.checkTaskSuccess(beJoinReq.getProcessId(), 
ProcessTypeEnum.START_SERVICE);
+        
processInstanceComponent.checkProcessFinish(buildClusterReq.getProcessId());
+        processInstanceComponent.checkHasUnfinishProcess(userId, 
buildClusterReq.getProcessId());
+        boolean success = 
taskInstanceComponent.checkTaskSuccess(buildClusterReq.getProcessId(), 
ProcessTypeEnum.START_SERVICE);
         Preconditions.checkArgument(success, "The service has not been started 
and completed, and the component cannot be clustered");
 
-        ProcessInstanceEntity process = 
processInstanceComponent.queryProcessById(beJoinReq.getProcessId());
-        //Query the alive agent that installed fe and get fe's query port
-        AgentEntity aliveAgent = getAliveAgent(process.getClusterId());
-        Integer queryPort = getFeQueryPort(aliveAgent.getHost(), 
aliveAgent.getPort());
+        ProcessInstanceEntity process = 
processInstanceComponent.queryProcessById(buildClusterReq.getProcessId());
+        //Query the fe leader agent that installed fe and get fe's query port
+        AgentEntity feLeaderAgent = getFeLeaderAgent(process);
+        Integer queryPort = getFeQueryPort(feLeaderAgent.getHost(), 
feLeaderAgent.getPort());
         Connection conn = null;
         try {
-            conn = JdbcUtil.getConnection(aliveAgent.getHost(), queryPort);
+            conn = JdbcUtil.getConnection(feLeaderAgent.getHost(), queryPort);
         } catch (SQLException e) {
-            log.error("get connection fail, host {}, port {} :", 
aliveAgent.getHost(), queryPort, e);
+            log.error("get connection fail, host {}, port {} :", 
feLeaderAgent.getHost(), queryPort, e);
             throw new JdbcException("Failed to get fe's jdbc connection");
         }
 
+        joinFe(conn, feHosts, feLeaderAgent.getHost());
+        joinBe(conn, beHosts);
+
+        //add broker
+        feHosts.addAll(beHosts);
+        List<String> brokers = 
feHosts.stream().distinct().collect(Collectors.toList());
+        joinBroker(conn, brokers);
+
+        JdbcUtil.closeConn(conn);
+    }
+
+    /**
+     * add fe to cluster
+     */
+    private void joinFe(Connection connection, List<String> feHosts, String 
leaderFe) {
+        for (String addFeHost : feHosts) {
+            if (leaderFe.equals(addFeHost)) {
+                continue;
+            }
+            AgentRoleEntity agentRole = 
agentRoleComponent.queryByHostRole(addFeHost, ServiceRole.FE.name());
+            Integer addFePort = getFeEditLogPort(addFeHost, 
agentCache.agentPort(addFeHost));
+            try {
+                String sql = "ALTER SYSTEM ADD %s \"%s:%s\"";
+                String joinFeSql = String.format(sql, 
agentRole.getFeNodeType().toUpperCase(), addFeHost, addFePort);
+                log.info("execute sql {}", joinFeSql);
+                JdbcUtil.execute(connection, joinFeSql);
+            } catch (SQLException ex) {
+                if (ex instanceof SQLSyntaxErrorException) {
+                    log.info("frontend already exist,response:{}", 
ex.getMessage());
+                    return;
+                }
+                log.error("Failed to add frontend:{}", addFeHost, ex);
+                throw new ServerException(ex.getMessage());
+            }
+        }
+    }
+
+    /**
+     * add be to cluster
+     */
+    private void joinBe(Connection conn, List<String> beHosts) {
         StringBuilder joinBeSb = new StringBuilder();
-        for (String be : beJoinReq.getHosts()) {
+        for (String be : beHosts) {
             Properties beConf = agentRest.roleConfig(be, 
agentCache.agentPort(be), ServiceRole.BE.name());
             String beHeatPort = 
beConf.getProperty(Constants.KEY_BE_HEARTBEAT_PORT);
             
joinBeSb.append("\"").append(be).append(":").append(beHeatPort).append("\"").append(",");
@@ -604,20 +600,42 @@ public class AgentProcessImpl implements AgentProcess {
         String joinBeStr = joinBeSb.deleteCharAt(joinBeSb.length() - 
1).toString();
         String addBe = String.format("ALTER SYSTEM ADD BACKEND %s", joinBeStr);
         try {
-            log.info("execute {}", addBe);
+            log.info("execute sql {}", addBe);
             JdbcUtil.execute(conn, addBe);
         } catch (SQLException e) {
-            if (e instanceof SQLSyntaxErrorException
-                    && StringUtils.isNotBlank(e.getMessage())
-                    && e.getMessage().contains(Constants.BE_EXIST_MSG)) {
+            if (e instanceof SQLSyntaxErrorException) {
                 log.info("backend already exist,response:{}", e.getMessage());
                 return;
             }
-            log.error("Failed to add backend:{}", joinBeStr, e);
+            log.error("Failed to add backends:{}", joinBeStr, e);
             throw new ServerException(e.getMessage());
         }
     }
 
+    /**
+     * add broker to cluster
+     */
+    private void joinBroker(Connection connection, List<String> addBrokerHost) 
{
+        StringBuilder joinBrokerSb = new StringBuilder();
+        for (String broker : addBrokerHost) {
+            Integer brokerIpcPort = getBrokerIpcPort(broker, 
agentCache.agentPort(broker));
+            
joinBrokerSb.append("\"").append(broker).append(":").append(brokerIpcPort).append("\"").append(",");
+        }
+        String joinBrokerStr = joinBrokerSb.deleteCharAt(joinBrokerSb.length() 
- 1).toString();
+        String addBroker = String.format("ALTER SYSTEM ADD BROKER broker_name 
%s", joinBrokerStr);
+        try {
+            JdbcUtil.execute(connection, addBroker);
+            log.info("execute sql {}", addBroker);
+        } catch (SQLException ex) {
+            if (ex instanceof SQLSyntaxErrorException) {
+                log.info("broker already exist,response:{}", ex.getMessage());
+                return;
+            }
+            log.error("Failed to add broker:{}", addBrokerHost, ex);
+            throw new ServerException(ex.getMessage());
+        }
+    }
+
     @Override
     public boolean register(AgentRoleRegister agentReg) {
         AgentRoleEntity agent = 
agentRoleComponent.queryByHostRole(agentReg.getHost(), agentReg.getRole());
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/ProcessTaskImpl.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/ProcessTaskImpl.java
index 5139ac0..a59d34e 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/ProcessTaskImpl.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/ProcessTaskImpl.java
@@ -128,6 +128,9 @@ public class ProcessTaskImpl implements ProcessTask {
                 continue;
             }
             TaskInstanceResp taskResp = task.transToModel();
+            if (taskResp.getStatus() == ExecutionStatus.SUBMITTED) {
+                taskResp.setStatus(ExecutionStatus.RUNNING);
+            }
             //set response
             if (task.getStatus().typeIsSuccess()) {
                 taskResp.setResponse(task.getTaskType().getName() + " 
success");
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/util/JdbcUtil.java 
b/manager/dm-server/src/main/java/org/apache/doris/stack/util/JdbcUtil.java
index fe7fd0a..42c2857 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/util/JdbcUtil.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/util/JdbcUtil.java
@@ -57,7 +57,6 @@ public class JdbcUtil {
             stmt = conn.prepareStatement(sql);
             return stmt.execute();
         } finally {
-            closeConn(conn);
             closeStmt(stmt);
         }
     }
diff --git "a/manager/doc/Doris Manager Server 
\346\216\245\345\217\243\346\226\207\346\241\243.md" "b/manager/doc/Doris 
Manager Server \346\216\245\345\217\243\346\226\207\346\241\243.md"
index c8e62b3..feec4d2 100644
--- "a/manager/doc/Doris Manager Server 
\346\216\245\345\217\243\346\226\207\346\241\243.md"    
+++ "b/manager/doc/Doris Manager Server 
\346\216\245\345\217\243\346\226\207\346\241\243.md"    
@@ -315,11 +315,11 @@
 
 **接口功能**
 
-> 组件集群,将BE加入集群
+> 组件集群,执行add fe, add be ,add broker 操作
 
 **URL**
 
-> /api/agent/joinBe
+> /api/agent/buildCluster
 
 **支持格式**
 
@@ -334,7 +334,8 @@
 > |参数|必选|类型|说明|
 > |:-----  |:-------|:-----|-----                               |
 > |processId|true|int|当前安装的流程ID,接口1返回的结果|
-> |hosts    |true    |String   |加入机器列表|
+> |feHosts    |true    |List<String>   |FE列表|
+> |beHosts    |true    |List<String>   |BE列表|
 
 **返回字段**
 
@@ -345,13 +346,14 @@
 
 **接口示例**
 
-> 地址:http://localhost:9601/api/agent/joinBe
+> 地址:http://localhost:9601/api/agent/buildCluster
 
 > 请求参数:
 ``` json
 {
     "processId":1,
-    "hosts":["10.220.147.155"]
+    "feHosts":["10.220.147.155"],
+    "beHosts":["10.220.147.155"]
 }
 ```
 > 返回参数:
diff --git a/manager/manager-bin/agent/bin/process_exist.sh 
b/manager/manager-bin/agent/bin/process_exist.sh
new file mode 100755
index 0000000..8aa75f5
--- /dev/null
+++ b/manager/manager-bin/agent/bin/process_exist.sh
@@ -0,0 +1,28 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if  [ ! -n "$1" ] ;then
+    exit 1
+fi
+
+processNum=`ps -ef | grep "${1}" |grep -v "${0}" | grep -v "grep" | wc -l`
+if [ $processNum -ge 1 ]; then
+    exit 0
+else
+    exit 1
+fi

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to