This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-manager.git
The following commit(s) were added to refs/heads/master by this push: new 254cef5 [feature] instance state report (#53) 254cef5 is described below commit 254cef55861545ff1d1ad853540a607ad7ed1348 Author: LiRui <1176867...@qq.com> AuthorDate: Fri Apr 22 16:46:54 2022 +0800 [feature] instance state report (#53) instance state report --- .../exceptions/InstanceNotInstallException.java | 32 ++++ .../exceptions/InstanceNotRunningException.java | 32 ++++ .../agent/exceptions/InstanceServiceException.java | 29 ++++ .../manager/agent/service/HeartBeatService.java | 170 ++++++++++++++++++--- .../service/heartbeat/DorisInstanceOperator.java | 118 ++++++++++++-- .../service/heartbeat/InstanceEventHandler.java | 12 +- .../apache/doris/manager/agent/util/Request.java | 43 +++--- .../src/main/resources/application.properties | 3 + manager/dm-common/pom.xml | 6 + .../manager/common/heartbeat/HeartBeatContext.java | 32 ++++ .../manager/common/heartbeat/HeartBeatResult.java | 32 ++++ .../manager/common/heartbeat/InstanceInfo.java | 40 +++++ .../common/heartbeat/InstanceStateResult.java | 53 +++++++ .../doris/manager/common/util/ConfigDefault.java | 1 + .../common/util/ServerAndAgentConstant.java | 2 + .../control/manager/DorisClusterModuleManager.java | 8 +- .../control/manager/ResourceClusterManager.java | 4 +- .../manager/ResourceNodeAndAgentManager.java | 32 +++- .../control/ResourceClusterNodeController.java | 31 ++-- .../control/ResourceClusterNodeService.java | 165 ++++++++++++++++++++ 20 files changed, 775 insertions(+), 70 deletions(-) diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotInstallException.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotInstallException.java new file mode 100644 index 0000000..cc3eee8 --- /dev/null +++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotInstallException.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.manager.agent.exceptions; + +public class InstanceNotInstallException extends Exception { + + private final String moduleName; + private final String installDir; + + public InstanceNotInstallException(String moduleName, String installDir) { + super(String.format("instance %s is not installed at %s", moduleName, installDir)); + + this.moduleName = moduleName; + this.installDir = installDir; + } +} + diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotRunningException.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotRunningException.java new file mode 100644 index 0000000..7561fe6 --- /dev/null +++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotRunningException.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.manager.agent.exceptions; + +public class InstanceNotRunningException extends Exception { + + private final String moduleName; + private final String installDir; + + public InstanceNotRunningException(String moduleName, String installDir) { + super(String.format("instance %s is not running at %s", moduleName, installDir)); + + this.moduleName = moduleName; + this.installDir = installDir; + } +} + diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceServiceException.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceServiceException.java new file mode 100644 index 0000000..35e5cc8 --- /dev/null +++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceServiceException.java @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.manager.agent.exceptions; + +public class InstanceServiceException extends Exception { + + private final String moduleName; + + public InstanceServiceException(String moduleName) { + super(String.format("instance %s service exception", moduleName)); + this.moduleName = moduleName; + } +} + diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/HeartBeatService.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/HeartBeatService.java index f590ea1..e869a8e 100644 --- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/HeartBeatService.java +++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/HeartBeatService.java @@ -18,17 +18,29 @@ package org.apache.doris.manager.agent.service; import lombok.extern.slf4j.Slf4j; +import org.apache.doris.manager.agent.exceptions.InstanceNotInstallException; +import org.apache.doris.manager.agent.exceptions.InstanceNotRunningException; +import org.apache.doris.manager.agent.exceptions.InstanceServiceException; +import org.apache.doris.manager.agent.service.heartbeat.DorisInstanceOperator; import org.apache.doris.manager.agent.service.heartbeat.HeartbeatEventHandler; import org.apache.doris.manager.agent.util.Request; +import org.apache.doris.manager.common.heartbeat.HeartBeatContext; import org.apache.doris.manager.common.heartbeat.HeartBeatEventInfo; import org.apache.doris.manager.common.heartbeat.HeartBeatEventResult; +import org.apache.doris.manager.common.heartbeat.HeartBeatEventResultType; +import org.apache.doris.manager.common.heartbeat.HeartBeatResult; +import org.apache.doris.manager.common.heartbeat.InstanceInfo; +import org.apache.doris.manager.common.heartbeat.InstanceStateResult; +import org.apache.doris.stack.control.ModelControlState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; @Service @Slf4j @@ -36,9 +48,19 @@ public class HeartBeatService { @Autowired private HeartbeatEventHandler heartbeatEventHandler; + @Autowired + private DorisInstanceOperator instanceOpera; + @Autowired private Environment environment; + private ConcurrentHashMap<Long, HeartBeatEventInfo> events = new ConcurrentHashMap<>(); // event id + private ConcurrentHashMap<Long, InstanceInfo> instanceInfos = new ConcurrentHashMap<>(); // instance id + + // cache event result in case of http post result failure + // some event is not reentrant + private ConcurrentHashMap<Long, HeartBeatEventResult> cacheResults = new ConcurrentHashMap<>(); + // When the agent starts, it needs to complete the registration before it can handle other heartbeats private String agentNodeId = ""; @@ -46,38 +68,148 @@ public class HeartBeatService { private String heartBeatUrl = ""; - // TODO:Reserved for subsequent active reporting of instance status -// private Set<InstanceInfo> instances = new HashSet<>(); - - // TODO: To be improved - // TODO: Currently, the heartbeat implemented here is only responsible for obtaining the events to be executed - // from the server, and does not report the instance status controlled by the current agent - // TODO:Execute once when the agent process starts? - // Send heartbeat every 5 seconds, get heartbeat event list -// @PostConstruct - @Scheduled(cron = "0/5 * * * * ?") - public void heartBeat() { + @Scheduled(cron = "0/${agent.heartbeat.interval:5} * * * * ?") + public void handleHeartBeatContextLoop() { if (agentNodeId.isEmpty() || serverEndpoint.isEmpty()) { agentNodeId = environment.getProperty("agent.node.id"); serverEndpoint = environment.getProperty("manager.server.endpoint"); - heartBeatUrl = "http://" + serverEndpoint + "/api/control/node/" + agentNodeId + "/agent/heartbeat"; + heartBeatUrl = "http://" + serverEndpoint + "/api/control/node/" + agentNodeId + "/agent/context"; } - // TODO :Process according to the returned heartbeat event results - // TODO:If there is an event and it is processed, the result is sent + log.info("agent node is " + agentNodeId); log.info("heartBeatUrl is " + heartBeatUrl); - List<HeartBeatEventInfo> eventInfos = Request.getHeartBeatEventInfo(heartBeatUrl); + HeartBeatContext ctx = Request.getHeartBeatContext(heartBeatUrl); + + // duplicate task + HeartBeatContext newCtx = addAndFilterContextTask(ctx); + + Thread contextTask = new Thread(() -> { + HeartBeatResult res = executeContextTask(newCtx); + + cacheResults.clear(); + try { + String dealRes = Request.sendHeartBeatContextResult(heartBeatUrl, res); + log.info("server return context deal result: {}", dealRes); + } catch (IOException e) { + log.warn("send heartbeat context result error: {}", e.getMessage()); + res.getEventResults().forEach((eventRes) -> { + if (eventRes.getResultType() != HeartBeatEventResultType.FAIL) { + log.info("cache event {} result, event type {}, stage {}", eventRes.getEventId(), + eventRes.getEventType(), eventRes.getEventStage()); + cacheResults.put(eventRes.getEventId(), eventRes); + } + }); + } + + // clear completed tasks + newCtx.getEvents().forEach((e) -> { + log.info("remove finished [event {}] task", e.getEventId()); + events.remove(e.getEventId()); + }); + newCtx.getInstanceInfos().forEach((ins) -> { + instanceInfos.remove(ins.getInstanceId()); + }); + }); + + contextTask.start(); + } + + private HeartBeatResult executeContextTask(HeartBeatContext ctx) { + List<HeartBeatEventResult> eventResults = new ArrayList<>(); + List<InstanceStateResult> insStateResults = new ArrayList<>(); + + //TODO find from cache before + + for (HeartBeatEventInfo eventInfo : ctx.getEvents()) { + log.info("handle event {}: resource:{} type:{} stage:{}", eventInfo.getEventId(), + eventInfo.getResourceType(), eventInfo.getEventStage(), eventInfo.getEventStage()); + + // get result from cache if it has been executed + long eventId = eventInfo.getEventId(); + if (cacheResults.containsKey(eventId)) { + HeartBeatEventResult cr = cacheResults.get(eventId); + log.info("result is in cache, event {}, stage {}, result type {}", cr.getEventId(), + cr.getEventStage(), cr.getResultType()); + if (cr.getResultType() == HeartBeatEventResultType.PROCESSING + && eventInfo.getEventStage() < cr.getEventStage()) { + log.info("return result from result cache"); + eventResults.add(cr); + continue; + } else if (cr.getResultType() == HeartBeatEventResultType.SUCCESS + && cr.getEventStage() == cr.getEventStage()) { + log.info("return result form result cache"); + eventResults.add(cr); + continue; + } + } - List<HeartBeatEventResult> results = new ArrayList<>(); - for (HeartBeatEventInfo eventInfo : eventInfos) { HeartBeatEventResult result = heartbeatEventHandler.handHeartBeatEvent(eventInfo); if (result != null) { - results.add(result); + eventResults.add(result); + } + } + + for (InstanceInfo instanceInfo : ctx.getInstanceInfos()) { + log.info("check module {} instance {} state", instanceInfo.getModuleName(), instanceInfo.getInstanceId()); + InstanceStateResult stateResult = new InstanceStateResult(instanceInfo); + try { + instanceOpera.checkInstanceProcessState(instanceInfo.getModuleName(), instanceInfo.getInstallDir(), + instanceInfo.getHttpPort()); + + stateResult.setState(ModelControlState.RUNNING); + } catch (InstanceNotInstallException e) { + log.error("{} instance check exception {}", instanceInfo.getModuleName(), e.getMessage()); + // maybe instance has noe be installed + stateResult.setState(ModelControlState.INIT); + stateResult.setErrMsg(e.getMessage()); + } catch (InstanceNotRunningException | InstanceServiceException e) { + log.error("{} instance check exception {}", instanceInfo.getModuleName(), e.getMessage()); + stateResult.setState(ModelControlState.STOPPED); + stateResult.setErrMsg(e.getMessage()); } + + insStateResults.add(stateResult); } - Request.sendHeartBeatEventResult(heartBeatUrl, results); + HeartBeatResult res = new HeartBeatResult(); + res.setEventResults(eventResults); + res.setStateResults(insStateResults); + + return res; } + private HeartBeatContext addAndFilterContextTask(HeartBeatContext ctx) { + HeartBeatContext filterCtx = new HeartBeatContext(); + List<HeartBeatEventInfo> newEvents = new ArrayList<>(); + List<InstanceInfo> newInsInfos = new ArrayList<>(); + + if (ctx.getEvents() != null) { + for (HeartBeatEventInfo eventInfo : ctx.getEvents()) { + if (events.containsKey(eventInfo.getEventId())) { + log.warn("heartbeat event {} is running", eventInfo.getEventId()); + continue; + } + log.info("add event {}", eventInfo.getEventId()); + events.put(eventInfo.getEventId(), eventInfo); + newEvents.add(eventInfo); + } + } + + if (ctx.getInstanceInfos() != null) { + for (InstanceInfo ins : ctx.getInstanceInfos()) { + if (instanceInfos.containsKey(ins.getInstanceId())) { + log.warn("module {} instance {} check task is running", ins.getModuleName(), ins.getInstanceId()); + continue; + } + log.info("add {} instance {} check task", ins.getModuleName(), ins.getInstanceId()); + instanceInfos.put(ins.getInstanceId(), ins); + newInsInfos.add(ins); + } + } + + filterCtx.setEvents(newEvents); + filterCtx.setInstanceInfos(newInsInfos); + return filterCtx; + } } diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java index 621049e..ecfb40c 100644 --- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java +++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java @@ -17,10 +17,15 @@ package org.apache.doris.manager.agent.service.heartbeat; +import com.alibaba.fastjson.JSONObject; import com.google.common.base.Strings; import com.google.common.collect.Maps; import com.google.common.io.Files; import lombok.extern.slf4j.Slf4j; +import org.apache.doris.manager.agent.exceptions.InstanceNotInstallException; +import org.apache.doris.manager.agent.exceptions.InstanceNotRunningException; +import org.apache.doris.manager.agent.exceptions.InstanceServiceException; +import org.apache.doris.manager.agent.util.Request; import org.apache.doris.manager.agent.util.ShellUtil; import org.apache.doris.manager.common.util.ConfigDefault; import org.apache.doris.manager.common.util.ServerAndAgentConstant; @@ -30,8 +35,10 @@ import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; +import java.net.URISyntaxException; import java.nio.charset.Charset; import java.nio.file.Paths; +import java.util.HashMap; import java.util.Map; @Slf4j @@ -143,7 +150,7 @@ public class DorisInstanceOperator { executePkgShellScriptWithBash(startScript, installInfo, moudleName, Maps.newHashMap()); } else { - log.info("{} instance is running", moudleName); + log.info("instance {} is running", moudleName); } log.info("start {} instance success", moudleName); return true; @@ -192,21 +199,108 @@ public class DorisInstanceOperator { } // Check whether the instance has been installed and started - public boolean checkInstanceDeploy(String moudleName, String installInfo) { + public void checkInstanceProcessState(String moduleName, String installDir, int httpPort) + throws InstanceNotInstallException, InstanceNotRunningException, InstanceServiceException { + + if (moduleName.equals(ServerAndAgentConstant.BROKER_NAME)) { + moduleName = getBrokerInstallationPath(installDir); + } + + // install dir check + log.info("to check module {} instance process state in {}", moduleName, installDir); + File moduleRoot = Paths.get(installDir, moduleName).toFile(); + if (!moduleRoot.exists()) { + log.error("instance {} not installed, can not find root path: {}", moduleName, moduleRoot); + throw new InstanceNotInstallException(moduleName, installDir); + } + + // process state check + // dont consider multiple be is deployed at one a machine node try { - if (moudleName.equals(ServerAndAgentConstant.BROKER_NAME)) { - moudleName = getBrokerInstallationPath(installInfo); - } - int bePid = processIsRunning(moudleName, installInfo); - if (bePid < 0) { - return false; - } else { - return true; + int insPid = processIsRunning(moduleName, installDir); + if (insPid < 0) { + log.error("instance {} is not running", moduleName); + throw new InstanceNotRunningException(moduleName, moduleRoot.getAbsolutePath()); } } catch (Exception e) { - log.error("Check " + moudleName + " instance running error {}.", e); - return false; + log.error("check instance {} process state error: {}", moduleName, e.getMessage()); + throw new InstanceNotRunningException(moduleName, moduleRoot.getAbsolutePath()); + } + + if (httpPort <= 0) { + log.warn("invalid http port {}, skip http service check", httpPort); + return; } + + // fe/be http service check + String statusURL; + if (moduleName.equals(ServerAndAgentConstant.FE_NAME)) { + statusURL = "http://localhost:" + httpPort + "/api/bootstrap"; + } else if (moduleName.equals(ServerAndAgentConstant.BE_NAME)) { + statusURL = "http://localhost:" + httpPort + "/api/health"; + } else { + // can not check other module by http + log.error("unknown module name {}", moduleName); + return; + } + + /* + * Before Palo 3.10, the FE api/bootstrap API return like this: + * { + * "replayedJournalId": 0, + * "queryPort": 0, + * "rpcPort": 0, + * "status": "OK", + * "msg": "Success" + * } + * + * From 3.10, the FE api/bootstrap API return like this: + * { + * "msg": "success", + * "code": 0, + * "data": {"replayedJournalId": 0, "queryPort": 0, "rpcPort": 0}, + * "count": 0 + * } + * + * FE api/health return like this + * { + * "msg": "success", + * "code": 0, + * "data": {"online_backend_num": 2, "total_backend_num": 2}, + * "count": 0 + * } + * + * BE api/health return lik this + * {"status": "OK","msg": "To Be Added"} + * + */ + + String stateRes; + try { + stateRes = Request.sendGetRequest(statusURL, new HashMap<>()); + } catch (URISyntaxException e) { + log.error("{} syntax error {}", statusURL, e.getMessage()); + return; + } catch (IOException e) { + throw new InstanceServiceException(moduleName); + } + log.info("http health return: {}", stateRes); + // or status == ok + JSONObject stateJson = JSONObject.parseObject(stateRes); + String status = stateJson.getString("status"); + if (status != null && status.equals("OK")) { + log.info("module {} instance service is normal, return status OK", moduleName); + return; + } + + // or code == 0 + Integer code = stateJson.getInteger("code"); + if (code != null && code == 0) { + log.info("modele {} instance service is normal return code 0", moduleName); + return; + } + + throw new InstanceServiceException(moduleName); } /* diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/InstanceEventHandler.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/InstanceEventHandler.java index a7829c4..e86bc15 100644 --- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/InstanceEventHandler.java +++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/InstanceEventHandler.java @@ -59,7 +59,17 @@ public class InstanceEventHandler { InstanceDeployCheckEventConfigInfo configInfo = JSON.parseObject(jsonConfigStr, InstanceDeployCheckEventConfigInfo.class); - boolean isDeploy = instanceOperator.checkInstanceDeploy(configInfo.getModuleName(), configInfo.getInstallInfo()); + boolean isDeploy = true; + try { + // here we do not check http service is ready or not + // because it is ready after a few seconds of the process starts + // which may cause `Doris Cluster Creation Request` failed and blocked + instanceOperator.checkInstanceProcessState(configInfo.getModuleName(), + configInfo.getInstallInfo(), 0); + } catch (Exception e) { + log.error("check instance {} deploy error: {}", configInfo.getModuleName(), e.getMessage()); + isDeploy = false; + } HeartBeatEventResult result = new HeartBeatEventResult(eventInfo); // There is only one step diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/util/Request.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/util/Request.java index c5d6ccc..9f6c95a 100644 --- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/util/Request.java +++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/util/Request.java @@ -19,8 +19,8 @@ package org.apache.doris.manager.agent.util; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; -import org.apache.doris.manager.common.heartbeat.HeartBeatEventInfo; -import org.apache.doris.manager.common.heartbeat.HeartBeatEventResult; +import org.apache.doris.manager.common.heartbeat.HeartBeatContext; +import org.apache.doris.manager.common.heartbeat.HeartBeatResult; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -35,29 +35,34 @@ import org.apache.http.util.EntityUtils; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; @Slf4j public class Request { + public static HeartBeatContext getHeartBeatContext(String requestUrl) { + String ctx = null; - public static List<HeartBeatEventInfo> getHeartBeatEventInfo(String requestUrl) { - String getHeartBeatEventResults = sendGetRequest(requestUrl, new HashMap<>()); - log.info("getHeartBeatEventResults:" + getHeartBeatEventResults); -// ManagerServerResponse response = JSON.parseObject(getHeartBeatEventResults, ManagerServerResponse.class); - if (getHeartBeatEventResults == null) { - return new ArrayList<>(); + try { + ctx = sendGetRequest(requestUrl, new HashMap<>()); + log.info("getHeartBeatContextResults:" + ctx); + } catch (Exception e) { + log.error("get heartbeat context error {}", e.getMessage()); + } + + if (ctx == null) { + log.warn("no context return"); + return new HeartBeatContext(); } - return JSON.parseArray(getHeartBeatEventResults, HeartBeatEventInfo.class); + return JSON.parseObject(ctx, HeartBeatContext.class); } - public static void sendHeartBeatEventResult(String requestUrl, List<HeartBeatEventResult> results) { - sendPostRequest(requestUrl, JSON.toJSONString(results)); + public static String sendHeartBeatContextResult(String requestUrl, HeartBeatResult res) throws IOException { + log.info("send heart beat context result {} to {}", JSON.toJSONString(res), requestUrl); + return sendPostRequest(requestUrl, JSON.toJSONString(res)); } - public static String sendPostRequest(String requestUrl, String bodyJson) { + public static String sendPostRequest(String requestUrl, String bodyJson) throws IOException { HttpPost httpPost = new HttpPost(requestUrl); httpPost.setConfig(timeout()); @@ -68,7 +73,7 @@ public class Request { return request(httpPost); } catch (IOException e) { log.error("request url error:{},param:{}", requestUrl, bodyJson, e); - throw new RuntimeException(e); + throw e; } } @@ -87,7 +92,8 @@ public class Request { } } - public static String sendGetRequest(String requestUrl, Map<String, Object> params) { + public static String sendGetRequest(String requestUrl, Map<String, Object> params) + throws URISyntaxException, IOException { URI url = null; try { URIBuilder uriBuilder = null; @@ -97,7 +103,8 @@ public class Request { } url = uriBuilder.build(); } catch (URISyntaxException e) { - e.printStackTrace(); + log.error("{} syntax error {}", requestUrl, e.getMessage()); + throw e; } HttpGet httpGet = new HttpGet(url); @@ -107,7 +114,7 @@ public class Request { return request(httpGet); } catch (IOException e) { log.error("request url error:{},param:{}", requestUrl, params, e); - throw new RuntimeException(e); + throw e; } } diff --git a/manager/dm-agent/src/main/resources/application.properties b/manager/dm-agent/src/main/resources/application.properties index 4a73966..0327076 100644 --- a/manager/dm-agent/src/main/resources/application.properties +++ b/manager/dm-agent/src/main/resources/application.properties @@ -21,3 +21,6 @@ server.port=8001 manager.server.endpoint= # manager agent unique identification agent.node.id= + +# heart beat interval(s) +agent.heartbeat.interval=5 diff --git a/manager/dm-common/pom.xml b/manager/dm-common/pom.xml index 44cf13e..06c7265 100644 --- a/manager/dm-common/pom.xml +++ b/manager/dm-common/pom.xml @@ -27,6 +27,12 @@ under the License. <modelVersion>4.0.0</modelVersion> <artifactId>dm-common</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.doris</groupId> + <artifactId>resource-common</artifactId> + </dependency> + </dependencies> </project> \ No newline at end of file diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatContext.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatContext.java new file mode 100644 index 0000000..3556f34 --- /dev/null +++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatContext.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.manager.common.heartbeat; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class HeartBeatContext { + List<HeartBeatEventInfo> events; + List<InstanceInfo> instanceInfos; +} diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatResult.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatResult.java new file mode 100644 index 0000000..d70a12a --- /dev/null +++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatResult.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.manager.common.heartbeat; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class HeartBeatResult { + List<HeartBeatEventResult> eventResults; + List<InstanceStateResult> stateResults; +} diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceInfo.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceInfo.java new file mode 100644 index 0000000..96b16d0 --- /dev/null +++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceInfo.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.manager.common.heartbeat; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class InstanceInfo { + private long agentNodeId; + + private long moduleId; + + private long instanceId; + + private String moduleName; + + private String installDir; + + //only available when moduleName = fe/be + private int httpPort; +} diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceStateResult.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceStateResult.java new file mode 100644 index 0000000..31a4e47 --- /dev/null +++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceStateResult.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.manager.common.heartbeat; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.doris.stack.control.ModelControlState; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class InstanceStateResult { + private long agentNodeId; + + private long moduleId; + + private long instanceId; + + private String moduleName; + + private String installDir; + + ModelControlState state; + + // is valid only when state is RUNNING + String errMsg; + + public InstanceStateResult(InstanceInfo info) { + this.agentNodeId = info.getAgentNodeId(); + this.moduleId = info.getModuleId(); + this.instanceId = info.getInstanceId(); + this.moduleName = info.getModuleName(); + this.installDir = info.getInstallDir(); + + this.state = ModelControlState.UNKNOWN; + } +} diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ConfigDefault.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ConfigDefault.java index 619bc0c..db6e7fa 100644 --- a/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ConfigDefault.java +++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ConfigDefault.java @@ -37,6 +37,7 @@ public class ConfigDefault { public static final String FE_EDIT_LOG_PORT = "edit_log_port"; public static final String BE_HEARTBEAT_PORT_CONFIG_NAME = "heartbeat_service_port"; + public static final String BE_WEBSERVER_PORT_NAME = "webserver_port"; public static final String BROKER_PORT_CONFIG_NAME = "broker_ipc_port"; diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ServerAndAgentConstant.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ServerAndAgentConstant.java index dbc813f..eacfd4e 100644 --- a/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ServerAndAgentConstant.java +++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ServerAndAgentConstant.java @@ -73,6 +73,8 @@ public class ServerAndAgentConstant { public static final String FE_EDIT_SERVICE = "fe_edit"; public static final String BE_HEARTBEAT_SERVICE = "be_heartbeat"; + public static final String BE_HTTP_SERVICE = "be_http"; + public static final String BROKER_PRC_SERVICE = "broker_rpc"; public static final Map<String, String> BAIDU_BROKER_CONFIG_DEDAULT; diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java index 492e19d..5ac39a5 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java @@ -136,7 +136,13 @@ public class DorisClusterModuleManager { // for be service, heartbeat for (DeployConfigItem configItem : deployConfig.getConfigs()) { if (configItem.getKey().equals(ConfigDefault.BE_HEARTBEAT_PORT_CONFIG_NAME)) { - serviceNamePorts.put(ServerAndAgentConstant.BE_HEARTBEAT_SERVICE, Integer.valueOf(configItem.getValue())); + serviceNamePorts.put(ServerAndAgentConstant.BE_HEARTBEAT_SERVICE, + Integer.valueOf(configItem.getValue())); + } + + if (configItem.getKey().equals(ConfigDefault.BE_WEBSERVER_PORT_NAME)) { + serviceNamePorts.put(ServerAndAgentConstant.BE_HTTP_SERVICE, + Integer.valueOf(configItem.getValue())); } } serviceCreateOperation(moduleEntity, serviceNamePorts, accessInfo); diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java index d7e13c7..d55f34c 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java @@ -228,7 +228,7 @@ public class ResourceClusterManager { List<ResourceNodeEntity> agentInstalledNodes = new ArrayList<>(); for (ResourceNodeEntity nodeEntity : nodeEntities) { - if (!nodeAndAgentManager.checkAgentOperation(nodeEntity)) { + if (!nodeAndAgentManager.isAgentInstalled(nodeEntity)) { log.warn("the agent has not been installed on {} node {}", nodeEntity.getId(), nodeEntity.getHost()); } else { agentInstalledNodes.add(nodeEntity); @@ -269,7 +269,7 @@ public class ResourceClusterManager { } // async delete agent - for (ResourceNodeEntity nodeEntity : nodeEntities) { + for (ResourceNodeEntity nodeEntity : agentInstalledNodes) { AgentUnInstallEventConfigInfo uninstallConfig = new AgentUnInstallEventConfigInfo( accessInfo.getSshUser(), accessInfo.getSshPort(), accessInfo.getSshKey(), nodeEntity.getHost(), nodeEntity.getAgentInstallDir(), diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java index 45a5253..41aced1 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java @@ -81,7 +81,7 @@ public class ResourceNodeAndAgentManager { throws Exception { // check agent has been installed or not - if (!checkAgentOperation(node)) { + if (!isAgentInstalled(node)) { log.warn("node[{}]:{} does not install agent, no need to uninstall", node.getId(), node.getHost()); return; } @@ -182,6 +182,34 @@ public class ResourceNodeAndAgentManager { } } + public boolean isAgentInstalled(ResourceNodeEntity node) { + long eventId = node.getCurrentEventId(); + + if (eventId < 1L) { + log.warn("The node no have agent"); + return false; + } else { + HeartBeatEventEntity eventEntity = heartBeatEventRepository.findById(eventId).get(); + + // handling other event, agent has been installed + if (!eventEntity.getType().equals(HeartBeatEventType.AGENT_INSTALL.name())) { + return true; + } + + // AGENT_INSTALL event + if (eventEntity.isCompleted() && eventEntity.getStatus().equals(HeartBeatEventResultType.SUCCESS.name())) { + return true; + } + + if (eventEntity.getStage() >= AgentInstallEventStage.AGENT_START.getStage()) { + return true; + } + + log.warn("Agent has not been installed successfully"); + return false; + } + } + public boolean isAvailableAgentPort(ResourceNodeEntity node, AgentInstallEventConfigInfo configInfo) throws Exception { // agent port check, eg: Spring Boot Param server.port=8008 @@ -369,7 +397,7 @@ public class ResourceNodeAndAgentManager { } private void uninstallEventProcess(ResourceNodeEntity node, AgentUnInstallEventConfigInfo configInfo, - HeartBeatEventEntity agentUninstallAgentEntity) { + HeartBeatEventEntity agentUninstallAgentEntity) { if (!agentUninstallAgentEntity.getType().equals(HeartBeatEventType.AGENT_STOP.name())) { log.warn("agent no need to stop on {} node {}", node.getId(), node.getHost()); return; diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/controller/control/ResourceClusterNodeController.java b/manager/dm-server/src/main/java/org/apache/doris/stack/controller/control/ResourceClusterNodeController.java index 9392d99..1f07142 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/controller/control/ResourceClusterNodeController.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/controller/control/ResourceClusterNodeController.java @@ -20,8 +20,8 @@ package org.apache.doris.stack.controller.control; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; -import org.apache.doris.manager.common.heartbeat.HeartBeatEventInfo; -import org.apache.doris.manager.common.heartbeat.HeartBeatEventResult; +import org.apache.doris.manager.common.heartbeat.HeartBeatContext; +import org.apache.doris.manager.common.heartbeat.HeartBeatResult; import org.apache.doris.stack.entity.CoreUserEntity; import org.apache.doris.stack.rest.ResponseEntityBuilder; import org.apache.doris.stack.service.control.ResourceClusterNodeService; @@ -38,7 +38,6 @@ import org.springframework.web.bind.annotation.RestController; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import java.util.List; @Api(tags = "Resource Cluster Node Agent API") @RestController @@ -51,20 +50,22 @@ public class ResourceClusterNodeController { @Autowired private ResourceClusterNodeService nodeService; - @ApiOperation(value = "get node agent heartbeat") - @GetMapping(value = "{agentNodeId}/agent/heartbeat", produces = MediaType.APPLICATION_JSON_VALUE) - public List<HeartBeatEventInfo> getHeartbeat(HttpServletRequest request, - HttpServletResponse response, - @PathVariable(value = "agentNodeId") long agentNodeId) { - return nodeService.getHeartbeat(agentNodeId); + @ApiOperation(value = "get heart beat context(node agent heartbeat and instance info)") + @GetMapping(value = "{agentNodeId}/agent/context", produces = MediaType.APPLICATION_JSON_VALUE) + public HeartBeatContext getHeartbeatContext(HttpServletRequest request, + HttpServletResponse response, + @PathVariable(value = "agentNodeId") long agentNodeId) { + return nodeService.getHeartBeatContext(agentNodeId); } - @ApiOperation(value = "deal node agent heartbeat event result") - @PostMapping(value = "{agentNodeId}/agent/heartbeat", produces = MediaType.APPLICATION_JSON_VALUE) - public Object postHeartbeat(HttpServletRequest request, - HttpServletResponse response, - @RequestBody List<HeartBeatEventResult> results) throws Exception { - nodeService.dealHeartbeatResult(results); + @ApiOperation(value = "deal heart beat context)") + @PostMapping(value = "{agentNodeId}/agent/context", produces = MediaType.APPLICATION_JSON_VALUE) + public Object postHeartbeatContext(HttpServletRequest request, + HttpServletResponse response, + @PathVariable(value = "agentNodeId") long agentNodeId, + @RequestBody HeartBeatResult ctx) { + log.info("agent {} post heartbeat context", agentNodeId); + nodeService.dealHeartbeatContext(ctx); return "SUCCESS"; } diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/service/control/ResourceClusterNodeService.java b/manager/dm-server/src/main/java/org/apache/doris/stack/service/control/ResourceClusterNodeService.java index f151e74..b10f192 100644 --- a/manager/dm-server/src/main/java/org/apache/doris/stack/service/control/ResourceClusterNodeService.java +++ b/manager/dm-server/src/main/java/org/apache/doris/stack/service/control/ResourceClusterNodeService.java @@ -19,27 +19,41 @@ package org.apache.doris.stack.service.control; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; +import org.apache.doris.manager.common.heartbeat.HeartBeatContext; import org.apache.doris.manager.common.heartbeat.HeartBeatEventInfo; import org.apache.doris.manager.common.heartbeat.HeartBeatEventResourceType; import org.apache.doris.manager.common.heartbeat.HeartBeatEventResult; import org.apache.doris.manager.common.heartbeat.HeartBeatEventType; +import org.apache.doris.manager.common.heartbeat.HeartBeatResult; +import org.apache.doris.manager.common.heartbeat.InstanceInfo; +import org.apache.doris.manager.common.heartbeat.InstanceStateResult; import org.apache.doris.manager.common.heartbeat.config.AgentInstallEventConfigInfo; +import org.apache.doris.manager.common.util.ServerAndAgentConstant; +import org.apache.doris.stack.control.ModelControlState; import org.apache.doris.stack.control.manager.ResourceNodeAndAgentManager; import org.apache.doris.stack.dao.ClusterInstanceRepository; +import org.apache.doris.stack.dao.ClusterModuleRepository; +import org.apache.doris.stack.dao.ClusterModuleServiceRepository; import org.apache.doris.stack.dao.HeartBeatEventRepository; import org.apache.doris.stack.dao.ResourceClusterRepository; import org.apache.doris.stack.dao.ResourceNodeRepository; import org.apache.doris.stack.entity.ClusterInstanceEntity; +import org.apache.doris.stack.entity.ClusterModuleEntity; +import org.apache.doris.stack.entity.ClusterModuleServiceEntity; import org.apache.doris.stack.entity.HeartBeatEventEntity; import org.apache.doris.stack.entity.ResourceClusterEntity; import org.apache.doris.stack.entity.ResourceNodeEntity; import org.apache.doris.stack.model.request.control.PMResourceClusterAccessInfo; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import java.sql.Timestamp; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; +import java.util.Optional; @Service @Slf4j @@ -57,9 +71,15 @@ public class ResourceClusterNodeService { @Autowired private HeartBeatEventRepository heartBeatEventRepository; + @Autowired + private ClusterModuleRepository clusterModuleRepository; + @Autowired private ResourceNodeAndAgentManager nodeAndAgentManager; + @Autowired + private ClusterModuleServiceRepository serviceRepository; + // Send uncompleted heartbeat events that need to be handled by agent public List<HeartBeatEventInfo> getHeartbeat(long agentNodeId) { log.info("Get agent {} uncompleted heartbeat events", agentNodeId); @@ -84,6 +104,60 @@ public class ResourceClusterNodeService { return eventInfos; } + public List<InstanceInfo> getInstanceInfo(long agentNodeId) { + log.info("get node {} instance info", agentNodeId); + List<ClusterInstanceEntity> instanceEntities = instanceRepository.getByNodeId(agentNodeId); + + List<InstanceInfo> instanceInfos = new ArrayList<>(); + + for (ClusterInstanceEntity ins : instanceEntities) { + Optional<ClusterModuleEntity> moduleEntityOpt = clusterModuleRepository.findById(ins.getModuleId()); + if (!moduleEntityOpt.isPresent()) { + log.error("this instance module {} is not find, ignore it", ins.getModuleId()); + continue; + } + + ClusterModuleEntity moduleEntity = moduleEntityOpt.get(); + + log.info("to get module {} instance {} info", moduleEntity.getModuleName(), ins.getId()); + + int httpPort = 0; + String httpServerName = ""; + if (moduleEntity.getModuleName().equals(ServerAndAgentConstant.FE_NAME)) { + httpServerName = ServerAndAgentConstant.FE_HTTP_SERVICE; + } else if (moduleEntity.getModuleName().equals(ServerAndAgentConstant.BE_NAME)) { + httpServerName = ServerAndAgentConstant.BE_HTTP_SERVICE; + } + + List<ClusterModuleServiceEntity> httpServices = serviceRepository.getByClusterIdAndName( + moduleEntity.getClusterId(), httpServerName); + + for (ClusterModuleServiceEntity service : httpServices) { + List<String> addrList = JSON.parseArray(service.getAddressInfo(), String.class); + if (addrList.contains(ins.getAddress())) { + httpPort = service.getPort(); + } + } + + log.info("module {} instance {} http port is {}", moduleEntity.getModuleName(), ins.getId(), httpPort); + + InstanceInfo instanceInfo = new InstanceInfo(ins.getNodeId(), ins.getModuleId(), ins.getId(), + moduleEntity.getModuleName(), ins.getInstallInfo(), httpPort); + log.info("get instance {} info: {}", ins.getId(), instanceInfo); + instanceInfos.add(instanceInfo); + } + + return instanceInfos; + } + + public HeartBeatContext getHeartBeatContext(long agentNodeId) { + log.info("start to get heartbeat context"); + HeartBeatContext ctx = new HeartBeatContext(); + ctx.setEvents(getHeartbeat(agentNodeId)); + ctx.setInstanceInfos(getInstanceInfo(agentNodeId)); + return ctx; + } + // Handle the result of the heartbeat event of the agent public void dealHeartbeatResult(List<HeartBeatEventResult> eventResults) { for (HeartBeatEventResult eventResult : eventResults) { @@ -101,6 +175,38 @@ public class ResourceClusterNodeService { } } + public void dealHeartbeatContext(HeartBeatResult ctx) { + log.info("deal heart beat context {}", ctx); + + if (ctx.getEventResults() == null) { + log.warn("no events to deal"); + } else { + dealHeartbeatResult(ctx.getEventResults()); + } + + if (ctx.getStateResults() == null) { + log.warn("no instance state result to deal"); + } else { + for (InstanceStateResult stateResult : ctx.getStateResults()) { + dealInstanceState(stateResult); + } + } + } + + public void dealInstanceState(InstanceStateResult stateResult) { + log.info("update module {} instance {} state is {}", stateResult.getModuleName(), stateResult.getInstanceId(), + stateResult.getState()); + + Optional<ClusterInstanceEntity> instanceEntityOpt = instanceRepository.findById(stateResult.getInstanceId()); + if (!instanceEntityOpt.isPresent()) { + log.error("instance {} does not exists", stateResult.getInstanceId()); + } + + ClusterInstanceEntity instanceEntity = instanceEntityOpt.get(); + instanceEntity.setCurrentState(stateResult.getState().getValue()); + instanceRepository.save(instanceEntity); + } + public void operateAgent(long nodeId, String operateType) throws Exception { HeartBeatEventType eventType = HeartBeatEventType.valueOf(operateType); @@ -149,4 +255,63 @@ public class ResourceClusterNodeService { return; } } + + @Transactional(rollbackFor = Exception.class) + public void updateInstancesState(ResourceNodeEntity node, int state) { + log.info("update instances of node {} {} to state {}", node.getId(), node.getHost(), state); + List<ClusterInstanceEntity> instanceEntities = instanceRepository.getByNodeId(node.getId()); + if (instanceEntities.isEmpty()) { + log.warn("node {} does hava any instances", node.getId()); + return; + } + + for (ClusterInstanceEntity ins : instanceEntities) { + log.info("update instance {} of module {} to {}", ins.getId(), ins.getModuleId(), state); + ins.setCurrentState(state); + instanceRepository.save(ins); + } + } + + @Scheduled(cron = "0/60 * * * * ?") + public void agentNodeStateCheck() { + log.info("start to check agent nodes state"); + List<ResourceNodeEntity> nodes = nodeRepository.findAll(); + if (nodes.isEmpty()) { + log.info("no any agent nodes"); + return; + } + + for (ResourceNodeEntity node : nodes) { + Timestamp lastTime = node.getLastHeartBeatTimestamp(); + if (lastTime == null) { + log.warn("not receive heartbeat yet form node {} {}", node.getId(), node.getHost()); + if (node.getCurrentState() != ModelControlState.INIT.getValue()) { + node.setCurrentState(ModelControlState.INIT.getValue()); + nodeRepository.save(node); + } + continue; + } + + log.info("node {} {} last heartbeat time {}", node.getId(), node.getHost(), + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastTime)); + if (System.currentTimeMillis() - lastTime.getTime() > 60 * 1000) { + log.warn("node {} heartbeat timeout", node.getId()); + log.warn("update node {} form {} to {}", node.getId(), node.getCurrentState(), + ModelControlState.UNKNOWN.getValue()); + if (node.getCurrentState() != ModelControlState.UNKNOWN.getValue()) { + node.setCurrentState(ModelControlState.UNKNOWN.getValue()); + log.info("update all instance of node {} to UNKNOWN", node.getId()); + updateInstancesState(node, ModelControlState.UNKNOWN.getValue()); + } + } else { + log.info("node {} heartbeat state normal", node.getId()); + log.warn("update node {} form {} to {}", node.getId(), node.getCurrentState(), + ModelControlState.RUNNING.getValue()); + if (node.getCurrentState() != ModelControlState.RUNNING.getValue()) { + node.setCurrentState(ModelControlState.RUNNING.getValue()); + } + } + nodeRepository.save(node); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org