This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 03847b6a3a [Feature](Api) Support operate node(fe/be). (#14904)
03847b6a3a is described below

commit 03847b6a3a4dcc3a255e59e2942c139041f1e7cc
Author: Stalary <[email protected]>
AuthorDate: Wed Dec 14 23:18:56 2022 +0800

    [Feature](Api) Support operate node(fe/be). (#14904)
    
    Support operate node(fe/be) via http
---
 .../http-actions/fe/manager/node-action.md         | 188 ++++++++++++++-
 .../http-actions/fe/manager/node-action.md         | 186 +++++++++++++++
 .../doris/httpv2/rest/manager/NodeAction.java      | 263 ++++++++++++++-------
 3 files changed, 545 insertions(+), 92 deletions(-)

diff --git a/docs/en/docs/admin-manual/http-actions/fe/manager/node-action.md 
b/docs/en/docs/admin-manual/http-actions/fe/manager/node-action.md
index 217a28af08..e1189b7e4e 100644
--- a/docs/en/docs/admin-manual/http-actions/fe/manager/node-action.md
+++ b/docs/en/docs/admin-manual/http-actions/fe/manager/node-action.md
@@ -27,7 +27,7 @@ under the License.
 # Node Action
 
 ## Request
-s
+
 `GET /rest/v2/manager/node/frontends`
 
 `GET /rest/v2/manager/node/backends`
@@ -44,6 +44,14 @@ s
 
 `POST /rest/v2/manager/node/set_config/be`
 
+<version since="dev">
+
+`POST /rest/v2/manager/node/{action}/be`
+
+`POST /rest/v2/manager/node/{action}/fe`
+
+</version>
+
 ## Get information about fe, be, broker nodes
 
 `GET /rest/v2/manager/node/frontends`
@@ -432,4 +440,182 @@ failed Indicates a configuration message that failed to 
be modified.
     }
 
     gent_task_resend_wait_time_ms configuration value modified successfully, 
alter_table_timeout_second modification failed.
+    ```
+
+## Operate be node
+
+`POST /rest/v2/manager/node/{action}/be`
+
+### Description
+
+Used to add/drop/offline be node
+
+action:ADD/DROP/DECOMMISSION
+
+### Request body
+```
+{
+    "hostPorts": ["127.0.0.1:9050"],
+    "properties": {
+        "tag.location": "test"
+    }
+}
+
+hostPorts A set of be node addresses to be operated, ip:heartbeat_port
+properties The configuration passed in when adding a node is only used to 
configure the tag. If not, the default tag is used
+```
+
+### Response
+```
+{
+    "msg": "Error",
+    "code": 1,
+    "data": "errCode = 2, detailMessage = Same backend already 
exists[127.0.0.1:9050]",
+    "count": 0
+}
+
+msg Success/Error
+code 0/1
+data ""/Error message
+```
+
+### Examples
+
+1. add be node
+
+   post /rest/v2/manager/node/ADD/be
+   Request body
+    ```
+    {
+        "hostPorts": ["127.0.0.1:9050"]
+    }
+    ```
+
+   Response
+    ```
+    {
+        "msg": "success",
+        "code": 0,
+        "data": null,
+        "count": 0
+    }
+    ```
+
+2. drop be node
+
+   post /rest/v2/manager/node/DROP/be
+   Request body
+    ```
+    {
+        "hostPorts": ["127.0.0.1:9050"]
+    }
+    ```
+
+   Response
+    ```
+    {
+        "msg": "success",
+        "code": 0,
+        "data": null,
+        "count": 0
+    }
+    ```
+
+3. offline be node
+
+   post /rest/v2/manager/node/DECOMMISSION/be
+   Request body
+    ```
+    {
+        "hostPorts": ["127.0.0.1:9050"]
+    }
+    ```
+
+   Response
+    ```
+    {
+        "msg": "success",
+        "code": 0,
+        "data": null,
+        "count": 0
+    }
+    ```
+
+## Operate fe node
+
+`POST /rest/v2/manager/node/{action}/fe`
+
+### Description
+
+Used to add/drop fe node
+
+action:ADD/DROP
+
+### Request body
+```
+{
+    "role": "FOLLOWER",
+    "hostPort": "127.0.0.1:9030"
+}
+
+role FOLLOWER/OBSERVER
+hostPort The address of the fe node to be operated, ip:edit_log_port
+```
+
+### Response
+```
+{
+    "msg": "Error",
+    "code": 1,
+    "data": "errCode = 2, detailMessage = frontend already exists name: 
127.0.0.1:9030_1670495889415, role: FOLLOWER, 127.0.0.1:9030",
+    "count": 0
+}
+
+msg Success/Error
+code 0/1
+data ""/Error message
+```
+
+### Examples
+
+1. add FOLLOWER node
+
+   post /rest/v2/manager/node/ADD/fe
+   Request body
+    ```
+    {
+        "role": "FOLLOWER",
+        "hostPort": "127.0.0.1:9030"
+    }
+    ```
+
+   Response
+    ```
+    {
+        "msg": "success",
+        "code": 0,
+        "data": null,
+        "count": 0
+    }
+    ```
+
+2. drop FOLLOWER node
+
+   post /rest/v2/manager/node/DROP/fe
+   Request body
+    ```
+    {
+        "role": "FOLLOWER",
+        "hostPort": "127.0.0.1:9030"
+    }
+    ```
+
+   Response
+    ```
+    {
+        "msg": "success",
+        "code": 0,
+        "data": null,
+        "count": 0
+    }
     ```
\ No newline at end of file
diff --git 
a/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/node-action.md 
b/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/node-action.md
index 2eb2b7d41d..9960ad1551 100644
--- a/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/node-action.md
+++ b/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/node-action.md
@@ -44,6 +44,14 @@ under the License.
 
 `POST /rest/v2/manager/node/set_config/be`
 
+<version since="dev">
+
+`POST /rest/v2/manager/node/{action}/be`
+
+`POST /rest/v2/manager/node/{action}/fe`
+
+</version>
+
 ## 获取fe, be, broker节点信息
 
 `GET /rest/v2/manager/node/frontends`
@@ -432,4 +440,182 @@ failed 表示修改失败的配置信息。
     }
 
     agent_task_resend_wait_time_ms 配置值修改成功,alter_table_timeout_second 修改失败。
+    ```
+   
+## 操作 be 节点
+
+`POST /rest/v2/manager/node/{action}/be`
+
+### Description
+
+用于添加/删除/下线 be 节点
+
+action:ADD/DROP/DECOMMISSION
+
+### Request body
+```
+{
+    "hostPorts": ["127.0.0.1:9050"],
+    "properties": {
+        "tag.location": "test"
+    }
+}
+
+hostPorts 需要操作的一组 be 节点地址 ip:heartbeat_port
+properties 添加节点时传入的配置,目前只用于配置 tag, 不传使用默认 tag
+```
+
+### Response
+```
+{
+    "msg": "Error",
+    "code": 1,
+    "data": "errCode = 2, detailMessage = Same backend already 
exists[127.0.0.1:9050]",
+    "count": 0
+}
+
+msg Success/Error
+code 0/1
+data ""/报错信息
+```
+
+### Examples
+
+1. 添加 be 节点
+
+   post /rest/v2/manager/node/ADD/be
+   Request body
+    ```
+    {
+        "hostPorts": ["127.0.0.1:9050"]
+    }
+    ```
+
+   Response
+    ```
+    {
+        "msg": "success",
+        "code": 0,
+        "data": null,
+        "count": 0
+    }
+    ```
+
+2. 删除 be 节点
+
+   post /rest/v2/manager/node/DROP/be
+   Request body
+    ```
+    {
+        "hostPorts": ["127.0.0.1:9050"]
+    }
+    ```
+
+   Response
+    ```
+    {
+        "msg": "success",
+        "code": 0,
+        "data": null,
+        "count": 0
+    }
+    ```
+
+3. 下线 be 节点
+
+   post /rest/v2/manager/node/DECOMMISSION/be
+   Request body
+    ```
+    {
+        "hostPorts": ["127.0.0.1:9050"]
+    }
+    ```
+
+   Response
+    ```
+    {
+        "msg": "success",
+        "code": 0,
+        "data": null,
+        "count": 0
+    }
+    ```
+
+## 操作 fe 节点
+
+`POST /rest/v2/manager/node/{action}/fe`
+
+### Description
+
+用于添加/删除 fe 节点
+
+action:ADD/DROP
+
+### Request body
+```
+{
+    "role": "FOLLOWER",
+    "hostPort": "127.0.0.1:9030"
+}
+
+role FOLLOWER/OBSERVER
+hostPort 需要操作的 fe 节点地址 ip:edit_log_port
+```
+
+### Response
+```
+{
+    "msg": "Error",
+    "code": 1,
+    "data": "errCode = 2, detailMessage = frontend already exists name: 
127.0.0.1:9030_1670495889415, role: FOLLOWER, 127.0.0.1:9030",
+    "count": 0
+}
+
+msg Success/Error
+code 0/1
+data ""/报错信息
+```
+
+### Examples
+
+1. 添加 FOLLOWER 节点
+
+    post /rest/v2/manager/node/ADD/fe
+    Request body
+    ```
+    {
+        "role": "FOLLOWER",
+        "hostPort": "127.0.0.1:9030"
+    }
+    ```
+   
+    Response
+    ```
+    {
+        "msg": "success",
+        "code": 0,
+        "data": null,
+        "count": 0
+    }
+    ```
+
+2. 删除 FOLLOWER 节点
+
+   post /rest/v2/manager/node/DROP/fe
+   Request body
+    ```
+    {
+        "role": "FOLLOWER",
+        "hostPort": "127.0.0.1:9030"
+    }
+    ```
+
+   Response
+    ```
+    {
+        "msg": "success",
+        "code": 0,
+        "data": null,
+        "count": 0
+    }
     ```
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
index f294e7a050..b4c968f834 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
@@ -21,20 +21,26 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ConfigBase;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.proc.ProcResult;
 import org.apache.doris.common.proc.ProcService;
+import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.ha.FrontendNodeType;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.httpv2.rest.RestBaseController;
 import org.apache.doris.httpv2.rest.SetConfigAction;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
 import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.system.SystemInfoService.HostInfo;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
@@ -44,17 +50,22 @@ import com.google.common.collect.Maps;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.reflect.TypeToken;
+import lombok.Data;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -85,15 +96,12 @@ public class NodeAction extends RestBaseController {
     public static final String CONFIG_VALUE = "配置值";
     public static final String IS_MUTABLE = "可修改";
 
-    public static final ImmutableList<String> FE_CONFIG_TITLE_NAMES = new 
ImmutableList.Builder<String>()
-            .add(CONFIG).add(NODE_IP_PORT).add(NODE_TYPE).add(CONFIG_TYPE)
-            .add(MASTER_ONLY).add(CONFIG_VALUE).add(IS_MUTABLE)
+    public static final ImmutableList<String> FE_CONFIG_TITLE_NAMES = new 
ImmutableList.Builder<String>().add(CONFIG)
+            
.add(NODE_IP_PORT).add(NODE_TYPE).add(CONFIG_TYPE).add(MASTER_ONLY).add(CONFIG_VALUE).add(IS_MUTABLE)
             .build();
 
-    public static final ImmutableList<String> BE_CONFIG_TITLE_NAMES = new 
ImmutableList.Builder<String>()
-            .add(CONFIG).add(NODE_IP_PORT).add(NODE_TYPE).add(CONFIG_TYPE)
-            .add(CONFIG_VALUE).add(IS_MUTABLE)
-            .build();
+    public static final ImmutableList<String> BE_CONFIG_TITLE_NAMES = new 
ImmutableList.Builder<String>().add(CONFIG)
+            
.add(NODE_IP_PORT).add(NODE_TYPE).add(CONFIG_TYPE).add(CONFIG_VALUE).add(IS_MUTABLE).build();
 
     private Object httpExecutorLock = new Object();
     private static volatile ExecutorService httpExecutor = null;
@@ -187,9 +195,8 @@ public class NodeAction extends RestBaseController {
                 Backend be = 
Env.getCurrentSystemInfo().getBackend(beIds.get(0));
                 String url = "http://"; + be.getHost() + ":" + be.getHttpPort() 
+ "/api/show_config";
                 String questResult = HttpUtils.doGet(url, null);
-                List<List<String>> configs = 
GsonUtils.GSON.fromJson(questResult,
-                        new TypeToken<List<List<String>>>() {
-                        }.getType());
+                List<List<String>> configs = 
GsonUtils.GSON.fromJson(questResult, new TypeToken<List<List<String>>>() {
+                }.getType());
                 for (List<String> config : configs) {
                     beConfigNames.add(config.get(0));
                 }
@@ -223,19 +230,15 @@ public class NodeAction extends RestBaseController {
     }
 
     private static List<String> getFeList() {
-        return Env.getCurrentEnv().getFrontends(null)
-                .stream()
-                .map(fe -> fe.getHost() + ":" + Config.http_port)
+        return Env.getCurrentEnv().getFrontends(null).stream().map(fe -> 
fe.getHost() + ":" + Config.http_port)
                 .collect(Collectors.toList());
     }
 
     private static List<String> getBeList() {
-        return Env.getCurrentSystemInfo().getBackendIds(false)
-                .stream().map(beId -> {
-                    Backend be = Env.getCurrentSystemInfo().getBackend(beId);
-                    return be.getHost() + ":" + be.getHttpPort();
-                })
-                .collect(Collectors.toList());
+        return 
Env.getCurrentSystemInfo().getBackendIds(false).stream().map(beId -> {
+            Backend be = Env.getCurrentSystemInfo().getBackend(beId);
+            return be.getHost() + ":" + be.getHttpPort();
+        }).collect(Collectors.toList());
     }
 
     /*
@@ -302,8 +305,8 @@ public class NodeAction extends RestBaseController {
     // }
     @RequestMapping(path = "/configuration_info", method = RequestMethod.POST)
     public Object configurationInfo(HttpServletRequest request, 
HttpServletResponse response,
-                                    @RequestParam(value = "type") String type,
-                                    @RequestBody(required = false) 
ConfigInfoRequestBody requestBody) {
+            @RequestParam(value = "type") String type,
+            @RequestBody(required = false) ConfigInfoRequestBody requestBody) {
         executeCheckPassword(request, response);
         checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), 
PrivPredicate.ADMIN);
 
@@ -342,15 +345,13 @@ public class NodeAction extends RestBaseController {
             }
             return ResponseEntityBuilder.ok(new 
NodeInfo(BE_CONFIG_TITLE_NAMES, data));
         }
-        return ResponseEntityBuilder.badRequest("Unsupported type: " + type + 
". Only types of fe or be are "
-                + "supported");
+        return ResponseEntityBuilder.badRequest(
+                "Unsupported type: " + type + ". Only types of fe or be are " 
+ "supported");
     }
 
     // Use thread pool to concurrently fetch configuration information from 
specified fe or be nodes.
-    private List<List<String>> handleConfigurationInfo(List<Pair<String, 
Integer>> hostPorts,
-                                                       String authorization, 
String questPath,
-                                                       String nodeType, 
List<String> confNames,
-                                                       List<Map.Entry<String, 
Integer>> errNodes) {
+    private List<List<String>> handleConfigurationInfo(List<Pair<String, 
Integer>> hostPorts, String authorization,
+            String questPath, String nodeType, List<String> confNames, 
List<Map.Entry<String, Integer>> errNodes) {
         // The configuration information returned by each node is a 
List<List<String>> type,
         // configInfoTotal is used to store the configuration information of 
all nodes.
         List<List<List<String>>> configInfoTotal = Lists.newArrayList();
@@ -361,8 +362,9 @@ public class NodeAction extends RestBaseController {
             Pair<String, Integer> hostPort = hostPorts.get(i);
             configRequestDoneSignal.addMark(hostPort.first + ":" + 
hostPort.second, -1);
             String url = "http://"; + hostPort.first + ":" + hostPort.second + 
questPath;
-            httpExecutor.submit(new HttpConfigInfoTask(url, hostPort, 
authorization, nodeType, confNames,
-                    configRequestDoneSignal, configInfoTotal.get(i)));
+            httpExecutor.submit(
+                    new HttpConfigInfoTask(url, hostPort, authorization, 
nodeType, confNames, configRequestDoneSignal,
+                            configInfoTotal.get(i)));
         }
         List<List<String>> resultConfigs = Lists.newArrayList();
         try {
@@ -381,8 +383,7 @@ public class NodeAction extends RestBaseController {
         if (httpExecutor == null) {
             synchronized (httpExecutorLock) {
                 if (httpExecutor == null) {
-                    httpExecutor = 
ThreadPoolManager.newDaemonFixedThreadPool(5, 100,
-                            "node-config-update-pool", true);
+                    httpExecutor = 
ThreadPoolManager.newDaemonFixedThreadPool(5, 100, "node-config-update-pool", 
true);
                 }
             }
         }
@@ -411,9 +412,8 @@ public class NodeAction extends RestBaseController {
         private List<List<String>> config;
 
         public HttpConfigInfoTask(String url, Pair<String, Integer> hostPort, 
String authorization, String nodeType,
-                                  List<String> confNames,
-                                  MarkedCountDownLatch<String, Integer> 
configRequestDoneSignal,
-                                  List<List<String>> config) {
+                List<String> confNames, MarkedCountDownLatch<String, Integer> 
configRequestDoneSignal,
+                List<List<String>> config) {
             this.url = url;
             this.hostPort = hostPort;
             this.authorization = authorization;
@@ -427,11 +427,10 @@ public class NodeAction extends RestBaseController {
         public void run() {
             String configInfo;
             try {
-                configInfo = HttpUtils.doGet(url, ImmutableMap.<String, 
String>builder().put(AUTHORIZATION,
-                        authorization).build());
-                List<List<String>> configs = 
GsonUtils.GSON.fromJson(configInfo,
-                        new TypeToken<List<List<String>>>() {
-                        }.getType());
+                configInfo = HttpUtils.doGet(url,
+                        ImmutableMap.<String, 
String>builder().put(AUTHORIZATION, authorization).build());
+                List<List<String>> configs = 
GsonUtils.GSON.fromJson(configInfo, new TypeToken<List<List<String>>>() {
+                }.getType());
                 for (List<String> conf : configs) {
                     if (confNames == null || confNames.isEmpty() || 
confNames.contains(conf.get(0))) {
                         addConfig(conf);
@@ -477,15 +476,14 @@ public class NodeAction extends RestBaseController {
     //  }
     @RequestMapping(path = "/set_config/fe", method = RequestMethod.POST)
     public Object setConfigFe(HttpServletRequest request, HttpServletResponse 
response,
-                              @RequestBody Map<String, SetConfigRequestBody> 
requestBody) {
+            @RequestBody Map<String, SetConfigRequestBody> requestBody) {
         executeCheckPassword(request, response);
         checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), 
PrivPredicate.ADMIN);
 
         List<Map<String, String>> failedTotal = Lists.newArrayList();
         List<NodeConfigs> nodeConfigList = parseSetConfigNodes(requestBody, 
failedTotal);
-        List<Pair<String, Integer>> aliveFe = 
Env.getCurrentEnv().getFrontends(null)
-                .stream().filter(Frontend::isAlive).map(fe -> 
Pair.of(fe.getHost(), Config.http_port))
-                .collect(Collectors.toList());
+        List<Pair<String, Integer>> aliveFe = 
Env.getCurrentEnv().getFrontends(null).stream().filter(Frontend::isAlive)
+                .map(fe -> Pair.of(fe.getHost(), 
Config.http_port)).collect(Collectors.toList());
         checkNodeIsAlive(nodeConfigList, aliveFe, failedTotal);
 
         Map<String, String> header = Maps.newHashMap();
@@ -498,8 +496,8 @@ public class NodeAction extends RestBaseController {
                     String responsePersist = HttpUtils.doGet(url, header);
                     parseFeSetConfigResponse(responsePersist, 
nodeConfigs.getHostPort(), failedTotal);
                 } catch (Exception e) {
-                    addSetConfigErrNode(nodeConfigs.getConfigs(true), 
nodeConfigs.getHostPort(),
-                            e.getMessage(), failedTotal);
+                    addSetConfigErrNode(nodeConfigs.getConfigs(true), 
nodeConfigs.getHostPort(), e.getMessage(),
+                            failedTotal);
                 }
             }
             if (!nodeConfigs.getConfigs(false).isEmpty()) {
@@ -508,8 +506,8 @@ public class NodeAction extends RestBaseController {
                     String responseTemp = HttpUtils.doGet(url, header);
                     parseFeSetConfigResponse(responseTemp, 
nodeConfigs.getHostPort(), failedTotal);
                 } catch (Exception e) {
-                    addSetConfigErrNode(nodeConfigs.getConfigs(false), 
nodeConfigs.getHostPort(),
-                            e.getMessage(), failedTotal);
+                    addSetConfigErrNode(nodeConfigs.getConfigs(false), 
nodeConfigs.getHostPort(), e.getMessage(),
+                            failedTotal);
                 }
             }
 
@@ -520,17 +518,16 @@ public class NodeAction extends RestBaseController {
     }
 
     private void addSetConfigErrNode(Map<String, String> configs, Pair<String, 
Integer> hostPort, String err,
-                                     List<Map<String, String>> failedTotal) {
+            List<Map<String, String>> failedTotal) {
         for (Map.Entry<String, String> entry : configs.entrySet()) {
             Map<String, String> failed = Maps.newHashMap();
-            addFailedConfig(entry.getKey(), entry.getValue(), hostPort.first + 
":"
-                    + hostPort.second, err, failed);
+            addFailedConfig(entry.getKey(), entry.getValue(), hostPort.first + 
":" + hostPort.second, err, failed);
             failedTotal.add(failed);
         }
     }
 
     private void parseFeSetConfigResponse(String response, Pair<String, 
Integer> hostPort,
-                                          List<Map<String, String>> 
failedTotal) throws Exception {
+            List<Map<String, String>> failedTotal) throws Exception {
         JsonObject jsonObject = 
JsonParser.parseString(response).getAsJsonObject();
         if (jsonObject.get("code").getAsInt() != 
HttpUtils.REQUEST_SUCCESS_CODE) {
             throw new Exception(jsonObject.get("msg").getAsString());
@@ -546,7 +543,7 @@ public class NodeAction extends RestBaseController {
     }
 
     private static void addFailedConfig(String configName, String value, 
String node, String errInfo,
-                                        Map<String, String> failed) {
+            Map<String, String> failed) {
         failed.put("config_name", configName);
         failed.put("value", value);
         failed.put("node", node);
@@ -578,18 +575,16 @@ public class NodeAction extends RestBaseController {
     // The request body and return data are in the same format as fe
     @RequestMapping(path = "/set_config/be", method = RequestMethod.POST)
     public Object setConfigBe(HttpServletRequest request, HttpServletResponse 
response,
-                              @RequestBody Map<String, SetConfigRequestBody> 
requestBody) {
+            @RequestBody Map<String, SetConfigRequestBody> requestBody) {
         executeCheckPassword(request, response);
         checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), 
PrivPredicate.ADMIN);
 
         List<Map<String, String>> failedTotal = Lists.newArrayList();
         List<NodeConfigs> nodeConfigList = parseSetConfigNodes(requestBody, 
failedTotal);
-        List<Pair<String, Integer>> aliveBe = 
Env.getCurrentSystemInfo().getBackendIds(true)
-                .stream().map(beId -> {
-                    Backend be = Env.getCurrentSystemInfo().getBackend(beId);
-                    return Pair.of(be.getHost(), be.getHttpPort());
-                })
-                .collect(Collectors.toList());
+        List<Pair<String, Integer>> aliveBe = 
Env.getCurrentSystemInfo().getBackendIds(true).stream().map(beId -> {
+            Backend be = Env.getCurrentSystemInfo().getBackend(beId);
+            return Pair.of(be.getHost(), be.getHttpPort());
+        }).collect(Collectors.toList());
         checkNodeIsAlive(nodeConfigList, aliveBe, failedTotal);
 
         handleBeSetConfig(nodeConfigList, request.getHeader(AUTHORIZATION), 
failedTotal);
@@ -600,9 +595,97 @@ public class NodeAction extends RestBaseController {
         return ResponseEntityBuilder.ok(data);
     }
 
+    @PostMapping("/{action}/be")
+    public Object operateBackend(HttpServletRequest request, 
HttpServletResponse response, @PathVariable String action,
+            @RequestBody BackendReqInfo reqInfo) {
+        if (!Env.getCurrentEnv().isMaster()) {
+            return redirectToMaster(request, response);
+        }
+        try {
+            List<String> hostPorts = reqInfo.getHostPorts();
+            List<HostInfo> hostInfos = new ArrayList<>();
+            for (String hostPort : hostPorts) {
+                hostInfos.add(SystemInfoService.getIpHostAndPort(hostPort, 
true));
+            }
+            SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo();
+            if ("ADD".equals(action)) {
+                Map<String, String> properties;
+                if (reqInfo.getProperties() == null) {
+                    properties = new HashMap<>();
+                } else {
+                    properties = reqInfo.getProperties();
+                }
+                Map<String, String> tagMap = 
PropertyAnalyzer.analyzeBackendTagsProperties(properties,
+                        Tag.DEFAULT_BACKEND_TAG);
+                currentSystemInfo.addBackends(hostInfos, false, "", tagMap);
+            } else if ("DROP".equals(action)) {
+                currentSystemInfo.dropBackends(hostInfos);
+            } else if ("DECOMMISSION".equals(action)) {
+                ImmutableMap<Long, Backend> backendsInCluster = 
currentSystemInfo.getBackendsInCluster(
+                        SystemInfoService.DEFAULT_CLUSTER);
+                backendsInCluster.forEach((k, v) -> {
+                    hostInfos.stream()
+                            .filter(h -> 
v.getHostName().equals(h.getHostName()) && v.getHeartbeatPort() == h.getPort())
+                            .findFirst().ifPresent(h -> {
+                                v.setDecommissioned(true);
+                                
Env.getCurrentEnv().getEditLog().logBackendStateChange(v);
+                            });
+                });
+            }
+        } catch (UserException userException) {
+            return 
ResponseEntityBuilder.okWithCommonError(userException.getMessage());
+        }
+        return ResponseEntityBuilder.ok();
+    }
+
+    @PostMapping("/{action}/fe")
+    public Object operateFrontends(HttpServletRequest request, 
HttpServletResponse response,
+            @PathVariable String action, @RequestBody FrontendReqInfo reqInfo) 
{
+        if (!Env.getCurrentEnv().isMaster()) {
+            return redirectToMaster(request, response);
+        }
+        try {
+            String role = reqInfo.getRole();
+            String[] split = reqInfo.getHostPort().split(":");
+            String host = split[0];
+            int port = Integer.parseInt(split[1]);
+            Env currentEnv = Env.getCurrentEnv();
+            FrontendNodeType frontendNodeType;
+            if (FrontendNodeType.FOLLOWER.name().equals(role)) {
+                frontendNodeType = FrontendNodeType.FOLLOWER;
+            } else {
+                frontendNodeType = FrontendNodeType.OBSERVER;
+            }
+            if ("ADD".equals(action)) {
+                currentEnv.addFrontend(frontendNodeType, host, port);
+            } else if ("DROP".equals(action)) {
+                currentEnv.dropFrontend(frontendNodeType, host, port);
+            }
+        } catch (DdlException ddlException) {
+            return 
ResponseEntityBuilder.okWithCommonError(ddlException.getMessage());
+        }
+        return ResponseEntityBuilder.ok();
+    }
+
+    @Data
+    private static class BackendReqInfo {
+
+        private List<String> hostPorts;
+
+        private Map<String, String> properties;
+    }
+
+    @Data
+    private static class FrontendReqInfo {
+
+        private String role;
+
+        private String hostPort;
+    }
+
     // Parsing request body into List<NodeConfigs>
     private List<NodeConfigs> parseSetConfigNodes(Map<String, 
SetConfigRequestBody> requestBody,
-                                                  List<Map<String, String>> 
errNodes) {
+            List<Map<String, String>> errNodes) {
         List<NodeConfigs> nodeConfigsList = Lists.newArrayList();
         for (String configName : requestBody.keySet()) {
             SetConfigRequestBody configPara = requestBody.get(configName);
@@ -641,34 +724,34 @@ public class NodeAction extends RestBaseController {
     }
 
     private void checkNodeIsAlive(List<NodeConfigs> nodeConfigsList, 
List<Pair<String, Integer>> aliveNodes,
-                                  List<Map<String, String>> failedNodes) {
+            List<Map<String, String>> failedNodes) {
         Iterator<NodeConfigs> it = nodeConfigsList.iterator();
         while (it.hasNext()) {
             NodeConfigs node = it.next();
             boolean isExist = false;
             for (Pair<String, Integer> aliveHostPort : aliveNodes) {
-                if (aliveHostPort.first.equals(node.getHostPort().first)
-                        && 
aliveHostPort.second.equals(node.getHostPort().second)) {
+                if (aliveHostPort.first.equals(node.getHostPort().first) && 
aliveHostPort.second.equals(
+                        node.getHostPort().second)) {
                     isExist = true;
                     break;
                 }
             }
             if (!isExist) {
-                addSetConfigErrNode(node.getConfigs(true), node.getHostPort(),
-                        "Node does not exist or is not alive", failedNodes);
-                addSetConfigErrNode(node.getConfigs(false), node.getHostPort(),
-                        "Node does not exist or is not alive", failedNodes);
+                addSetConfigErrNode(node.getConfigs(true), node.getHostPort(), 
"Node does not exist or is not alive",
+                        failedNodes);
+                addSetConfigErrNode(node.getConfigs(false), 
node.getHostPort(), "Node does not exist or is not alive",
+                        failedNodes);
                 it.remove();
             }
         }
     }
 
     private List<Map<String, String>> handleBeSetConfig(List<NodeConfigs> 
nodeConfigList, String authorization,
-                                                        List<Map<String, 
String>> failedTotal) {
+            List<Map<String, String>> failedTotal) {
         initHttpExecutor();
 
-        int configNum = nodeConfigList.stream()
-                .mapToInt(e -> e.getConfigs(true).size() + 
e.getConfigs(false).size()).sum();
+        int configNum = nodeConfigList.stream().mapToInt(e -> 
e.getConfigs(true).size() + e.getConfigs(false).size())
+                .sum();
         MarkedCountDownLatch<String, Integer> beSetConfigCountDownSignal = new 
MarkedCountDownLatch<>(configNum);
         for (NodeConfigs nodeConfigs : nodeConfigList) {
             submitBeSetConfigTask(nodeConfigs, true, authorization, 
beSetConfigCountDownSignal, failedTotal);
@@ -692,30 +775,28 @@ public class NodeAction extends RestBaseController {
     }
 
     private void submitBeSetConfigTask(NodeConfigs nodeConfigs, boolean 
isPersist, String authorization,
-                                       MarkedCountDownLatch<String, Integer> 
beSetConfigCountDownSignal,
-                                       List<Map<String, String>> failedTotal) {
+            MarkedCountDownLatch<String, Integer> beSetConfigCountDownSignal, 
List<Map<String, String>> failedTotal) {
         if (!nodeConfigs.getConfigs(isPersist).isEmpty()) {
             for (Map.Entry<String, String> entry : 
nodeConfigs.getConfigs(isPersist).entrySet()) {
                 failedTotal.add(Maps.newHashMap());
                 Pair<String, Integer> hostPort = nodeConfigs.getHostPort();
-                
beSetConfigCountDownSignal.addMark(concatNodeConfig(hostPort.first, 
hostPort.second,
-                        entry.getKey(), entry.getValue()), -1);
-
-                String url = concatBeSetConfigUrl(hostPort.first, 
hostPort.second, entry.getKey(),
-                        entry.getValue(), isPersist);
-                httpExecutor.submit(new HttpSetConfigTask(url, hostPort, 
authorization, entry.getKey(),
-                        entry.getValue(), beSetConfigCountDownSignal,
-                        failedTotal.get(failedTotal.size() - 1)));
+                beSetConfigCountDownSignal.addMark(
+                        concatNodeConfig(hostPort.first, hostPort.second, 
entry.getKey(), entry.getValue()), -1);
+
+                String url = concatBeSetConfigUrl(hostPort.first, 
hostPort.second, entry.getKey(), entry.getValue(),
+                        isPersist);
+                httpExecutor.submit(
+                        new HttpSetConfigTask(url, hostPort, authorization, 
entry.getKey(), entry.getValue(),
+                                beSetConfigCountDownSignal, 
failedTotal.get(failedTotal.size() - 1)));
             }
         }
     }
 
-    private String concatBeSetConfigUrl(String host, Integer port, String 
configName,
-                                        String configValue, boolean isPersist) 
{
+    private String concatBeSetConfigUrl(String host, Integer port, String 
configName, String configValue,
+            boolean isPersist) {
         StringBuilder stringBuffer = new StringBuilder();
-        stringBuffer.append("http://";).append(host).append(":").append(port)
-                .append("/api/update_config")
-                
.append("?").append(configName).append("=").append(configValue);
+        
stringBuffer.append("http://";).append(host).append(":").append(port).append("/api/update_config").append("?")
+                .append(configName).append("=").append(configValue);
         if (isPersist) {
             stringBuffer.append("&persist=true");
         }
@@ -745,8 +826,8 @@ public class NodeAction extends RestBaseController {
         private Map<String, String> failed;
 
         public HttpSetConfigTask(String url, Pair<String, Integer> hostPort, 
String authorization, String configName,
-                                 String configValue, 
MarkedCountDownLatch<String, Integer> beSetConfigDoneSignal,
-                                 Map<String, String> failed) {
+                String configValue, MarkedCountDownLatch<String, Integer> 
beSetConfigDoneSignal,
+                Map<String, String> failed) {
             this.url = url;
             this.hostPort = hostPort;
             this.authorization = authorization;
@@ -759,16 +840,16 @@ public class NodeAction extends RestBaseController {
         @Override
         public void run() {
             try {
-                String response = HttpUtils.doPost(url, ImmutableMap.<String, 
String>builder().put(AUTHORIZATION,
-                        authorization).build(), null);
+                String response = HttpUtils.doPost(url,
+                        ImmutableMap.<String, 
String>builder().put(AUTHORIZATION, authorization).build(), null);
                 JsonObject jsonObject = 
JsonParser.parseString(response).getAsJsonObject();
                 String status = jsonObject.get("status").getAsString();
                 if (!status.equals("OK")) {
                     addFailedConfig(configName, configValue, hostPort.first + 
":" + hostPort.second,
                             jsonObject.get("msg").getAsString(), failed);
                 }
-                
beSetConfigDoneSignal.markedCountDown(concatNodeConfig(hostPort.first, 
hostPort.second,
-                        configName, configValue), -1);
+                beSetConfigDoneSignal.markedCountDown(
+                        concatNodeConfig(hostPort.first, hostPort.second, 
configName, configValue), -1);
             } catch (Exception e) {
                 LOG.warn("set be:{} config:{} failed.", hostPort.first + ":" + 
hostPort.second,
                         configName + "=" + configValue, e);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to