This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch doris-manager in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/doris-manager by this push: new 4a22750 [improvement](manager) Modify the agent startup service and build cluster logic (#7705) 4a22750 is described below commit 4a22750360ab72d55d5506596ab5e0977d2e11c9 Author: wudi <676366...@qq.com> AuthorDate: Sun Jan 16 10:39:58 2022 +0800 [improvement](manager) Modify the agent startup service and build cluster logic (#7705) 1. The agent judges that the fe/be/broker has been successfully started, The logic is modified to judge whether the process still exists after 10 seconds of starting. 2. The submitted state is uniformly changed to running at the front end 3. change [build cluster] the logic unified as add fe, add be, add broker --- manager/README.md | 13 +- .../agent/command/CommandResultService.java | 33 ++-- .../doris/manager/agent/common/AgentConstants.java | 8 +- .../doris/manager/agent/service/BeService.java | 5 + .../doris/manager/agent/service/BrokerService.java | 5 + .../doris/manager/agent/service/FeService.java | 5 + .../doris/manager/agent/service/Service.java | 25 ++++ .../apache/doris/stack/constants/Constants.java | 2 - .../doris/stack/controller/AgentController.java | 14 +- .../doris/stack/dao/TaskInstanceRepository.java | 2 +- .../{BeJoinReq.java => BuildClusterReq.java} | 6 +- .../apache/doris/stack/service/AgentProcess.java | 4 +- .../doris/stack/service/impl/AgentProcessImpl.java | 166 ++++++++++++--------- .../doris/stack/service/impl/ProcessTaskImpl.java | 3 + .../java/org/apache/doris/stack/util/JdbcUtil.java | 1 - ...216\245\345\217\243\346\226\207\346\241\243.md" | 12 +- manager/manager-bin/agent/bin/process_exist.sh | 28 ++++ 17 files changed, 207 insertions(+), 125 deletions(-) diff --git a/manager/README.md b/manager/README.md index a0b742a..713fee6 100644 --- a/manager/README.md +++ b/manager/README.md @@ -36,6 +36,8 @@ agent/, doris manger agent agent_stop.sh, doris manger agent stop script install_be.sh, doris be install script install_fe.sh, doris fe install script + install_broker.sh, doris broker install script + process_exist.sh, doris process detection script lib/ dm-agent.jar, executable package of doris manger agent server/, doris manger server @@ -98,13 +100,4 @@ Browser access ${serverIp}:8080, Manger server has a preset super administrator user name: Admin password: Admin@123 (Case sensitive) -``` - -### Step3: install agent service -After decompressing the tar package in the second step, there will be a directory called agent. copy the agent directory to the -machine where the agent service is installed, -``` -$ sh bin/agent_start.sh --agent ${agentIp} --server ${serverIp}:8080 -``` -${agentIp} is the IP of the machine where the agent is located, ${serverIp} is the IP of the machine where the manager server is located - +``` \ No newline at end of file diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/command/CommandResultService.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/command/CommandResultService.java index 12d00f2..5c175dc 100644 --- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/command/CommandResultService.java +++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/command/CommandResultService.java @@ -80,33 +80,33 @@ public class CommandResultService { return new CommandResult(tmpTaskResult); } + if (Objects.isNull(task.getTaskResult().getStartTime()) + || task.getTaskResult().getStartTime().getTime() + AgentConstants.COMMAND_EXECUTE_DETECT_DURATION_MILSECOND > System.currentTimeMillis()) { + tmpTaskResult.setTaskState(TaskState.RUNNING); + tmpTaskResult.setRetCode(AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE); + return new CommandResult(tmpTaskResult); + } + int retCode = AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE; Boolean health = false; - TaskState taskState = TaskState.RUNNING; if (CommandType.START_FE == commandType) { - health = ServiceContext.getServiceMap().get(ServiceRole.FE).isHealth(); + health = ServiceContext.getServiceMap().get(ServiceRole.FE).processExist(); retCode = health ? AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE : AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE; - taskState = health ? TaskState.FINISHED : TaskState.RUNNING; } else if (CommandType.STOP_FE == commandType) { - health = ServiceContext.getServiceMap().get(ServiceRole.FE).isHealth(); + health = ServiceContext.getServiceMap().get(ServiceRole.FE).processExist(); retCode = !health ? AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE : AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE; - taskState = !health ? TaskState.FINISHED : TaskState.RUNNING; } else if (CommandType.START_BE == commandType) { - health = ServiceContext.getServiceMap().get(ServiceRole.BE).isHealth(); + health = ServiceContext.getServiceMap().get(ServiceRole.BE).processExist(); retCode = health ? AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE : AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE; - taskState = health ? TaskState.FINISHED : TaskState.RUNNING; } else if (CommandType.STOP_BE == commandType) { - health = ServiceContext.getServiceMap().get(ServiceRole.BE).isHealth(); + health = ServiceContext.getServiceMap().get(ServiceRole.BE).processExist(); retCode = !health ? AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE : AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE; - taskState = !health ? TaskState.FINISHED : TaskState.RUNNING; } else if (CommandType.START_BROKER == commandType) { - health = ServiceContext.getServiceMap().get(ServiceRole.BROKER).isHealth(); + health = ServiceContext.getServiceMap().get(ServiceRole.BROKER).processExist(); retCode = health ? AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE : AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE; - taskState = health ? TaskState.FINISHED : TaskState.RUNNING; } else if (CommandType.STOP_BROKER == commandType) { - health = ServiceContext.getServiceMap().get(ServiceRole.BROKER).isHealth(); + health = ServiceContext.getServiceMap().get(ServiceRole.BROKER).processExist(); retCode = !health ? AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE : AgentConstants.COMMAND_EXECUTE_UNHEALTH_CODE; - taskState = !health ? TaskState.FINISHED : TaskState.RUNNING; } if (health && (CommandType.START_FE == commandType || CommandType.START_BE == commandType || CommandType.START_BROKER == commandType)) { @@ -116,13 +116,8 @@ public class CommandResultService { taskFinalStatus.put(task.getTaskId(), AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE); } - if (task.getTaskResult().getSubmitTime().getTime() + AgentConstants.COMMAND_EXECUTE_TIMEOUT_MILSECOND < System.currentTimeMillis()) { - taskFinalStatus.put(task.getTaskId(), AgentConstants.COMMAND_EXECUTE_TIMEOUT_CODE); - tmpTaskResult.setTaskState(TaskState.FINISHED); - } - tmpTaskResult.setRetCode(retCode); - tmpTaskResult.setTaskState(taskState); + tmpTaskResult.setTaskState(TaskState.FINISHED); return new CommandResult(tmpTaskResult); } } diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/common/AgentConstants.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/common/AgentConstants.java index 7e549a7..e4756a9 100644 --- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/common/AgentConstants.java +++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/common/AgentConstants.java @@ -20,8 +20,7 @@ package org.apache.doris.manager.agent.common; public class AgentConstants { public static final int COMMAND_EXECUTE_SUCCESS_CODE = 0; public static final int COMMAND_EXECUTE_UNHEALTH_CODE = -10; - public static final int COMMAND_EXECUTE_TIMEOUT_CODE = -11; - public static final int COMMAND_EXECUTE_TIMEOUT_MILSECOND = 180000; + public static final int COMMAND_EXECUTE_DETECT_DURATION_MILSECOND = 10000; public static final int COMMAND_HISTORY_SAVE_MAX_COUNT = 100; @@ -44,4 +43,9 @@ public class AgentConstants { public static final int TASK_ERROR_CODE_EXCEPTION = -502; public static final String LOG_TYPE_TASK = "TASK_LOG"; + + public static final String PROCESS_KEYWORD_FE = "org.apache.doris.PaloFe"; + public static final String PROCESS_KEYWORD_BE = "palo_be"; + public static final String PROCESS_KEYWORD_BROKER = "org.apache.doris.broker.hdfs.BrokerBootstrap"; + } diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BeService.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BeService.java index 3c749d4..b4efa43 100644 --- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BeService.java +++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BeService.java @@ -138,4 +138,9 @@ public class BeService extends Service { } } } + + @Override + public String serviceProcessKeyword() { + return AgentConstants.PROCESS_KEYWORD_BE; + } } diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BrokerService.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BrokerService.java index f9a80ff..cf1b356 100644 --- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BrokerService.java +++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/BrokerService.java @@ -62,4 +62,9 @@ public class BrokerService extends Service { } return true; } + + @Override + public String serviceProcessKeyword() { + return AgentConstants.PROCESS_KEYWORD_BROKER; + } } diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/FeService.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/FeService.java index f4449e6..0a03f5a 100644 --- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/FeService.java +++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/FeService.java @@ -115,4 +115,9 @@ public class FeService extends Service { } } } + + @Override + public String serviceProcessKeyword() { + return AgentConstants.PROCESS_KEYWORD_FE; + } } diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/Service.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/Service.java index 5cba549..e8e9e87 100644 --- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/Service.java +++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/Service.java @@ -17,7 +17,11 @@ package org.apache.doris.manager.agent.service; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.doris.manager.agent.common.AgentConstants; import org.apache.doris.manager.agent.exception.AgentException; +import org.apache.doris.manager.agent.register.AgentContext; import org.apache.doris.manager.common.domain.ServiceRole; import java.io.File; @@ -25,6 +29,7 @@ import java.io.FileReader; import java.io.IOException; import java.util.Properties; +@Slf4j public abstract class Service { protected ServiceRole serviceRole = null; protected String installDir = null; @@ -64,6 +69,26 @@ public abstract class Service { public abstract boolean isHealth(); + public abstract String serviceProcessKeyword(); + + public boolean processExist() { + String comm = AgentConstants.BASH_BIN + AgentContext.getAgentInstallDir() + "/bin/process_exist.sh " + serviceProcessKeyword(); + String[] commands = {"/bin/bash", "-c", comm}; + + int exitVal = AgentConstants.TASK_ERROR_CODE_DEFAULT; + try { + log.info("execute command:{}", StringUtils.join(commands, " ")); + Process proc = Runtime.getRuntime().exec(commands); + exitVal = proc.waitFor(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + log.info("execute exit code:{}", exitVal); + return exitVal == AgentConstants.COMMAND_EXECUTE_SUCCESS_CODE; + } + public void load() { doLoad(); } diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/constants/Constants.java b/manager/dm-server/src/main/java/org/apache/doris/stack/constants/Constants.java index 81703d4..c98ebb9 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/constants/Constants.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/constants/Constants.java @@ -31,6 +31,4 @@ public class Constants { public static final Integer DORIS_DEFAULT_FE_EDIT_LOG_PORT = 9010; public static final Integer DORIS_DEFAULT_BROKER_IPC_PORT = 8000; - public static final String BE_EXIST_MSG = "Same backend already exists"; - } diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/controller/AgentController.java b/manager/dm-server/src/main/java/org/apache/doris/stack/controller/AgentController.java index 12727a0..b296b53 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/controller/AgentController.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/controller/AgentController.java @@ -21,7 +21,7 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.apache.doris.manager.common.domain.AgentRoleRegister; import org.apache.doris.manager.common.domain.RResult; -import org.apache.doris.stack.model.request.BeJoinReq; +import org.apache.doris.stack.model.request.BuildClusterReq; import org.apache.doris.stack.model.request.DeployConfigReq; import org.apache.doris.stack.model.request.DorisExecReq; import org.apache.doris.stack.model.request.DorisInstallReq; @@ -81,13 +81,13 @@ public class AgentController { } /** - * join be to cluster + * build cluster add backend,add fe,add broker */ - @ApiOperation(value = "join be to cluster") - @RequestMapping(value = "/joinBe", method = RequestMethod.POST) - public RResult joinBe(HttpServletRequest request, HttpServletResponse response, - @RequestBody BeJoinReq beJoinReq) throws Exception { - agentProcess.joinBe(request, response, beJoinReq); + @ApiOperation(value = "build cluster") + @RequestMapping(value = "/buildCluster", method = RequestMethod.POST) + public RResult buildCluster(HttpServletRequest request, HttpServletResponse response, + @RequestBody BuildClusterReq buildClusterReq) throws Exception { + agentProcess.buildCluster(request, response, buildClusterReq); return RResult.success(); } diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/dao/TaskInstanceRepository.java b/manager/dm-server/src/main/java/org/apache/doris/stack/dao/TaskInstanceRepository.java index 9a5e153..63b4697 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/dao/TaskInstanceRepository.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/dao/TaskInstanceRepository.java @@ -37,6 +37,6 @@ public interface TaskInstanceRepository extends JpaRepository<TaskInstanceEntity @Query("select f from TaskInstanceEntity f where f.processId = :processId and host = :host and processType = :processType and taskType = :taskType") TaskInstanceEntity queryTask(@Param("processId") int processId, @Param("host") String host, @Param("processType") ProcessTypeEnum processType, @Param("taskType") TaskTypeEnum taskType); - @Query("select f from TaskInstanceEntity f where f.processId = :processId and taskType = :taskType") + @Query("select f from TaskInstanceEntity f where f.processId = :processId and taskType = :taskType order by f.id asc") List<TaskInstanceEntity> queryTasks(@Param("processId") int processId, @Param("taskType") TaskTypeEnum taskType); } diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/BeJoinReq.java b/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/BuildClusterReq.java similarity index 91% rename from manager/dm-server/src/main/java/org/apache/doris/stack/model/request/BeJoinReq.java rename to manager/dm-server/src/main/java/org/apache/doris/stack/model/request/BuildClusterReq.java index 299248d..8de0939 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/BeJoinReq.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/model/request/BuildClusterReq.java @@ -29,9 +29,11 @@ import java.util.List; @Data @AllArgsConstructor @NoArgsConstructor -public class BeJoinReq { +public class BuildClusterReq { private int processId; - private List<String> hosts; + private List<String> feHosts; + + private List<String> beHosts; } diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/service/AgentProcess.java b/manager/dm-server/src/main/java/org/apache/doris/stack/service/AgentProcess.java index 0695337..4002883 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/service/AgentProcess.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/service/AgentProcess.java @@ -19,7 +19,7 @@ package org.apache.doris.stack.service; import org.apache.doris.manager.common.domain.AgentRoleRegister; import org.apache.doris.manager.common.domain.HardwareInfo; -import org.apache.doris.stack.model.request.BeJoinReq; +import org.apache.doris.stack.model.request.BuildClusterReq; import org.apache.doris.stack.model.request.DeployConfigReq; import org.apache.doris.stack.model.request.DorisExecReq; import org.apache.doris.stack.model.request.DorisInstallReq; @@ -49,7 +49,7 @@ public interface AgentProcess { */ void startService(HttpServletRequest request, HttpServletResponse response, DorisStartReq dorisStart) throws Exception; - void joinBe(HttpServletRequest request, HttpServletResponse response, BeJoinReq beJoinReq) throws Exception; + void buildCluster(HttpServletRequest request, HttpServletResponse response, BuildClusterReq buildClusterReq) throws Exception; boolean register(AgentRoleRegister agentReg); diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/AgentProcessImpl.java b/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/AgentProcessImpl.java index bcd7997..3457ea4 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/AgentProcessImpl.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/AgentProcessImpl.java @@ -58,7 +58,7 @@ import org.apache.doris.stack.entity.ProcessInstanceEntity; import org.apache.doris.stack.entity.TaskInstanceEntity; import org.apache.doris.stack.exceptions.JdbcException; import org.apache.doris.stack.exceptions.ServerException; -import org.apache.doris.stack.model.request.BeJoinReq; +import org.apache.doris.stack.model.request.BuildClusterReq; import org.apache.doris.stack.model.request.DeployConfigReq; import org.apache.doris.stack.model.request.DorisExecReq; import org.apache.doris.stack.model.request.DorisInstallReq; @@ -366,8 +366,6 @@ public class AgentProcessImpl implements AgentProcess { public void run() { try { String leaderFeHost = waitMasterFeStart(process.getId()); - Integer leaderFeQueryPort = getFeQueryPort(leaderFeHost, agentCache.agentPort(leaderFeHost)); - joinFe(leaderFeHost, leaderFeQueryPort, start.getHost()); CommandRequest creq = buildStartCmd(ServiceRole.FE, leaderFeHost); handleAgentTask(execTask, creq); } catch (Exception e) { @@ -383,9 +381,6 @@ public class AgentProcessImpl implements AgentProcess { @Override public void run() { try { - String aliveFeHost = waitMasterFeStart(process.getId()); - Integer aliveFePort = getFeQueryPort(aliveFeHost, agentCache.agentPort(aliveFeHost)); - joinBroker(aliveFeHost, aliveFePort, execTask.getHost()); CommandRequest creq = buildStartCmd(ServiceRole.BROKER, null); handleAgentTask(execTask, creq); } catch (Exception e) { @@ -468,45 +463,6 @@ public class AgentProcessImpl implements AgentProcess { } /** - * add broker to cluster - */ - private void joinBroker(String aliveFeHost, int aliveFePort, String addBrokerHost) { - Integer brokerIpcPort = getBrokerIpcPort(addBrokerHost, agentCache.agentPort(addBrokerHost)); - Connection connection = null; - try { - connection = JdbcUtil.getConnection(aliveFeHost, aliveFePort); - String sql = "ALTER SYSTEM ADD BROKER broker_name \"%s:%s\""; - String joinBrokerSql = String.format(sql, addBrokerHost, brokerIpcPort); - JdbcUtil.execute(connection, joinBrokerSql); - log.info("execute {}", joinBrokerSql); - } catch (SQLException ex) { - log.error("add broker to cluster failed:{}", addBrokerHost, ex); - } finally { - JdbcUtil.closeConn(connection); - } - } - - /** - * add fe to cluster - */ - private void joinFe(String leaderHost, int leaderQueryPort, String addFeHost) { - AgentRoleEntity agentRole = agentRoleComponent.queryByHostRole(addFeHost, ServiceRole.FE.name()); - Integer addFePort = getFeEditLogPort(addFeHost, agentCache.agentPort(addFeHost)); - Connection connection = null; - try { - connection = JdbcUtil.getConnection(leaderHost, leaderQueryPort); - String sql = "ALTER SYSTEM ADD %s \"%s:%s\""; - String joinFeSql = String.format(sql, agentRole.getFeNodeType().toUpperCase(), addFeHost, addFePort); - JdbcUtil.execute(connection, joinFeSql); - log.info("execute {}", joinFeSql); - } catch (SQLException ex) { - log.error("add fe to cluster failed:{}", addFeHost, ex); - } finally { - JdbcUtil.closeConn(connection); - } - } - - /** * get fe http port **/ public Integer getFeQueryPort(String host, Integer port) { @@ -549,19 +505,14 @@ public class AgentProcessImpl implements AgentProcess { } /** - * get alive agent + * get fe leader agent,the first start fe task is leader fe */ - public AgentEntity getAliveAgent(int cluserId) { - List<AgentRoleEntity> agentRoleEntities = agentRoleComponent.queryAgentByRole(ServiceRole.FE.name(), cluserId); - AgentEntity aliveAgent = null; - for (AgentRoleEntity agentRole : agentRoleEntities) { - aliveAgent = agentCache.agentInfo(agentRole.getHost()); - if (AgentStatus.RUNNING.equals(aliveAgent.getStatus())) { - break; - } - } - Preconditions.checkNotNull(aliveAgent, "no agent alive"); - return aliveAgent; + public AgentEntity getFeLeaderAgent(ProcessInstanceEntity process) { + List<TaskInstanceEntity> taskEntities = taskInstanceRepository.queryTasks(process.getId(), TaskTypeEnum.START_FE); + Preconditions.checkArgument(ObjectUtils.isNotEmpty(taskEntities), "Failed find fe leader"); + String feLeader = taskEntities.get(0).getHost(); + AgentEntity feLeaderAgent = agentCache.agentInfo(feLeader); + return feLeaderAgent; } /** @@ -575,28 +526,73 @@ public class AgentProcessImpl implements AgentProcess { } @Override - public void joinBe(HttpServletRequest request, HttpServletResponse response, BeJoinReq beJoinReq) throws Exception { - Preconditions.checkArgument(ObjectUtils.isNotEmpty(beJoinReq.getHosts()), "not find backends"); + public void buildCluster(HttpServletRequest request, HttpServletResponse response, BuildClusterReq buildClusterReq) throws Exception { + List<String> feHosts = buildClusterReq.getFeHosts(); + List<String> beHosts = buildClusterReq.getBeHosts(); + Preconditions.checkArgument(ObjectUtils.isNotEmpty(feHosts), "not find Frontend"); + Preconditions.checkArgument(ObjectUtils.isNotEmpty(beHosts), "not find Backend"); + int userId = authenticationService.checkAllUserAuthWithCookie(request, response); - processInstanceComponent.checkProcessFinish(beJoinReq.getProcessId()); - processInstanceComponent.checkHasUnfinishProcess(userId, beJoinReq.getProcessId()); - boolean success = taskInstanceComponent.checkTaskSuccess(beJoinReq.getProcessId(), ProcessTypeEnum.START_SERVICE); + processInstanceComponent.checkProcessFinish(buildClusterReq.getProcessId()); + processInstanceComponent.checkHasUnfinishProcess(userId, buildClusterReq.getProcessId()); + boolean success = taskInstanceComponent.checkTaskSuccess(buildClusterReq.getProcessId(), ProcessTypeEnum.START_SERVICE); Preconditions.checkArgument(success, "The service has not been started and completed, and the component cannot be clustered"); - ProcessInstanceEntity process = processInstanceComponent.queryProcessById(beJoinReq.getProcessId()); - //Query the alive agent that installed fe and get fe's query port - AgentEntity aliveAgent = getAliveAgent(process.getClusterId()); - Integer queryPort = getFeQueryPort(aliveAgent.getHost(), aliveAgent.getPort()); + ProcessInstanceEntity process = processInstanceComponent.queryProcessById(buildClusterReq.getProcessId()); + //Query the fe leader agent that installed fe and get fe's query port + AgentEntity feLeaderAgent = getFeLeaderAgent(process); + Integer queryPort = getFeQueryPort(feLeaderAgent.getHost(), feLeaderAgent.getPort()); Connection conn = null; try { - conn = JdbcUtil.getConnection(aliveAgent.getHost(), queryPort); + conn = JdbcUtil.getConnection(feLeaderAgent.getHost(), queryPort); } catch (SQLException e) { - log.error("get connection fail, host {}, port {} :", aliveAgent.getHost(), queryPort, e); + log.error("get connection fail, host {}, port {} :", feLeaderAgent.getHost(), queryPort, e); throw new JdbcException("Failed to get fe's jdbc connection"); } + joinFe(conn, feHosts, feLeaderAgent.getHost()); + joinBe(conn, beHosts); + + //add broker + feHosts.addAll(beHosts); + List<String> brokers = feHosts.stream().distinct().collect(Collectors.toList()); + joinBroker(conn, brokers); + + JdbcUtil.closeConn(conn); + } + + /** + * add fe to cluster + */ + private void joinFe(Connection connection, List<String> feHosts, String leaderFe) { + for (String addFeHost : feHosts) { + if (leaderFe.equals(addFeHost)) { + continue; + } + AgentRoleEntity agentRole = agentRoleComponent.queryByHostRole(addFeHost, ServiceRole.FE.name()); + Integer addFePort = getFeEditLogPort(addFeHost, agentCache.agentPort(addFeHost)); + try { + String sql = "ALTER SYSTEM ADD %s \"%s:%s\""; + String joinFeSql = String.format(sql, agentRole.getFeNodeType().toUpperCase(), addFeHost, addFePort); + log.info("execute sql {}", joinFeSql); + JdbcUtil.execute(connection, joinFeSql); + } catch (SQLException ex) { + if (ex instanceof SQLSyntaxErrorException) { + log.info("frontend already exist,response:{}", ex.getMessage()); + return; + } + log.error("Failed to add frontend:{}", addFeHost, ex); + throw new ServerException(ex.getMessage()); + } + } + } + + /** + * add be to cluster + */ + private void joinBe(Connection conn, List<String> beHosts) { StringBuilder joinBeSb = new StringBuilder(); - for (String be : beJoinReq.getHosts()) { + for (String be : beHosts) { Properties beConf = agentRest.roleConfig(be, agentCache.agentPort(be), ServiceRole.BE.name()); String beHeatPort = beConf.getProperty(Constants.KEY_BE_HEARTBEAT_PORT); joinBeSb.append("\"").append(be).append(":").append(beHeatPort).append("\"").append(","); @@ -604,20 +600,42 @@ public class AgentProcessImpl implements AgentProcess { String joinBeStr = joinBeSb.deleteCharAt(joinBeSb.length() - 1).toString(); String addBe = String.format("ALTER SYSTEM ADD BACKEND %s", joinBeStr); try { - log.info("execute {}", addBe); + log.info("execute sql {}", addBe); JdbcUtil.execute(conn, addBe); } catch (SQLException e) { - if (e instanceof SQLSyntaxErrorException - && StringUtils.isNotBlank(e.getMessage()) - && e.getMessage().contains(Constants.BE_EXIST_MSG)) { + if (e instanceof SQLSyntaxErrorException) { log.info("backend already exist,response:{}", e.getMessage()); return; } - log.error("Failed to add backend:{}", joinBeStr, e); + log.error("Failed to add backends:{}", joinBeStr, e); throw new ServerException(e.getMessage()); } } + /** + * add broker to cluster + */ + private void joinBroker(Connection connection, List<String> addBrokerHost) { + StringBuilder joinBrokerSb = new StringBuilder(); + for (String broker : addBrokerHost) { + Integer brokerIpcPort = getBrokerIpcPort(broker, agentCache.agentPort(broker)); + joinBrokerSb.append("\"").append(broker).append(":").append(brokerIpcPort).append("\"").append(","); + } + String joinBrokerStr = joinBrokerSb.deleteCharAt(joinBrokerSb.length() - 1).toString(); + String addBroker = String.format("ALTER SYSTEM ADD BROKER broker_name %s", joinBrokerStr); + try { + JdbcUtil.execute(connection, addBroker); + log.info("execute sql {}", addBroker); + } catch (SQLException ex) { + if (ex instanceof SQLSyntaxErrorException) { + log.info("broker already exist,response:{}", ex.getMessage()); + return; + } + log.error("Failed to add broker:{}", addBrokerHost, ex); + throw new ServerException(ex.getMessage()); + } + } + @Override public boolean register(AgentRoleRegister agentReg) { AgentRoleEntity agent = agentRoleComponent.queryByHostRole(agentReg.getHost(), agentReg.getRole()); diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/ProcessTaskImpl.java b/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/ProcessTaskImpl.java index 5139ac0..a59d34e 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/ProcessTaskImpl.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/service/impl/ProcessTaskImpl.java @@ -128,6 +128,9 @@ public class ProcessTaskImpl implements ProcessTask { continue; } TaskInstanceResp taskResp = task.transToModel(); + if (taskResp.getStatus() == ExecutionStatus.SUBMITTED) { + taskResp.setStatus(ExecutionStatus.RUNNING); + } //set response if (task.getStatus().typeIsSuccess()) { taskResp.setResponse(task.getTaskType().getName() + " success"); diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/util/JdbcUtil.java b/manager/dm-server/src/main/java/org/apache/doris/stack/util/JdbcUtil.java index fe7fd0a..42c2857 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/util/JdbcUtil.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/util/JdbcUtil.java @@ -57,7 +57,6 @@ public class JdbcUtil { stmt = conn.prepareStatement(sql); return stmt.execute(); } finally { - closeConn(conn); closeStmt(stmt); } } diff --git "a/manager/doc/Doris Manager Server \346\216\245\345\217\243\346\226\207\346\241\243.md" "b/manager/doc/Doris Manager Server \346\216\245\345\217\243\346\226\207\346\241\243.md" index c8e62b3..feec4d2 100644 --- "a/manager/doc/Doris Manager Server \346\216\245\345\217\243\346\226\207\346\241\243.md" +++ "b/manager/doc/Doris Manager Server \346\216\245\345\217\243\346\226\207\346\241\243.md" @@ -315,11 +315,11 @@ **接口功能** -> 组件集群,将BE加入集群 +> 组件集群,执行add fe, add be ,add broker 操作 **URL** -> /api/agent/joinBe +> /api/agent/buildCluster **支持格式** @@ -334,7 +334,8 @@ > |参数|必选|类型|说明| > |:----- |:-------|:-----|----- | > |processId|true|int|当前安装的流程ID,接口1返回的结果| -> |hosts |true |String |加入机器列表| +> |feHosts |true |List<String> |FE列表| +> |beHosts |true |List<String> |BE列表| **返回字段** @@ -345,13 +346,14 @@ **接口示例** -> 地址:http://localhost:9601/api/agent/joinBe +> 地址:http://localhost:9601/api/agent/buildCluster > 请求参数: ``` json { "processId":1, - "hosts":["10.220.147.155"] + "feHosts":["10.220.147.155"], + "beHosts":["10.220.147.155"] } ``` > 返回参数: diff --git a/manager/manager-bin/agent/bin/process_exist.sh b/manager/manager-bin/agent/bin/process_exist.sh new file mode 100755 index 0000000..8aa75f5 --- /dev/null +++ b/manager/manager-bin/agent/bin/process_exist.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +if [ ! -n "$1" ] ;then + exit 1 +fi + +processNum=`ps -ef | grep "${1}" |grep -v "${0}" | grep -v "grep" | wc -l` +if [ $processNum -ge 1 ]; then + exit 0 +else + exit 1 +fi --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org