This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 03847b6a3a [Feature](Api) Support operate node(fe/be). (#14904)
03847b6a3a is described below
commit 03847b6a3a4dcc3a255e59e2942c139041f1e7cc
Author: Stalary <[email protected]>
AuthorDate: Wed Dec 14 23:18:56 2022 +0800
[Feature](Api) Support operate node(fe/be). (#14904)
Support operate node(fe/be) via http
---
.../http-actions/fe/manager/node-action.md | 188 ++++++++++++++-
.../http-actions/fe/manager/node-action.md | 186 +++++++++++++++
.../doris/httpv2/rest/manager/NodeAction.java | 263 ++++++++++++++-------
3 files changed, 545 insertions(+), 92 deletions(-)
diff --git a/docs/en/docs/admin-manual/http-actions/fe/manager/node-action.md
b/docs/en/docs/admin-manual/http-actions/fe/manager/node-action.md
index 217a28af08..e1189b7e4e 100644
--- a/docs/en/docs/admin-manual/http-actions/fe/manager/node-action.md
+++ b/docs/en/docs/admin-manual/http-actions/fe/manager/node-action.md
@@ -27,7 +27,7 @@ under the License.
# Node Action
## Request
-s
+
`GET /rest/v2/manager/node/frontends`
`GET /rest/v2/manager/node/backends`
@@ -44,6 +44,14 @@ s
`POST /rest/v2/manager/node/set_config/be`
+<version since="dev">
+
+`POST /rest/v2/manager/node/{action}/be`
+
+`POST /rest/v2/manager/node/{action}/fe`
+
+</version>
+
## Get information about fe, be, broker nodes
`GET /rest/v2/manager/node/frontends`
@@ -432,4 +440,182 @@ failed Indicates a configuration message that failed to
be modified.
}
gent_task_resend_wait_time_ms configuration value modified successfully,
alter_table_timeout_second modification failed.
+ ```
+
+## Operate be node
+
+`POST /rest/v2/manager/node/{action}/be`
+
+### Description
+
+Used to add/drop/offline be node
+
+action:ADD/DROP/DECOMMISSION
+
+### Request body
+```
+{
+ "hostPorts": ["127.0.0.1:9050"],
+ "properties": {
+ "tag.location": "test"
+ }
+}
+
+hostPorts A set of be node addresses to be operated, ip:heartbeat_port
+properties The configuration passed in when adding a node is only used to
configure the tag. If not, the default tag is used
+```
+
+### Response
+```
+{
+ "msg": "Error",
+ "code": 1,
+ "data": "errCode = 2, detailMessage = Same backend already
exists[127.0.0.1:9050]",
+ "count": 0
+}
+
+msg Success/Error
+code 0/1
+data ""/Error message
+```
+
+### Examples
+
+1. add be node
+
+ post /rest/v2/manager/node/ADD/be
+ Request body
+ ```
+ {
+ "hostPorts": ["127.0.0.1:9050"]
+ }
+ ```
+
+ Response
+ ```
+ {
+ "msg": "success",
+ "code": 0,
+ "data": null,
+ "count": 0
+ }
+ ```
+
+2. drop be node
+
+ post /rest/v2/manager/node/DROP/be
+ Request body
+ ```
+ {
+ "hostPorts": ["127.0.0.1:9050"]
+ }
+ ```
+
+ Response
+ ```
+ {
+ "msg": "success",
+ "code": 0,
+ "data": null,
+ "count": 0
+ }
+ ```
+
+3. offline be node
+
+ post /rest/v2/manager/node/DECOMMISSION/be
+ Request body
+ ```
+ {
+ "hostPorts": ["127.0.0.1:9050"]
+ }
+ ```
+
+ Response
+ ```
+ {
+ "msg": "success",
+ "code": 0,
+ "data": null,
+ "count": 0
+ }
+ ```
+
+## Operate fe node
+
+`POST /rest/v2/manager/node/{action}/fe`
+
+### Description
+
+Used to add/drop fe node
+
+action:ADD/DROP
+
+### Request body
+```
+{
+ "role": "FOLLOWER",
+ "hostPort": "127.0.0.1:9030"
+}
+
+role FOLLOWER/OBSERVER
+hostPort The address of the fe node to be operated, ip:edit_log_port
+```
+
+### Response
+```
+{
+ "msg": "Error",
+ "code": 1,
+ "data": "errCode = 2, detailMessage = frontend already exists name:
127.0.0.1:9030_1670495889415, role: FOLLOWER, 127.0.0.1:9030",
+ "count": 0
+}
+
+msg Success/Error
+code 0/1
+data ""/Error message
+```
+
+### Examples
+
+1. add FOLLOWER node
+
+ post /rest/v2/manager/node/ADD/fe
+ Request body
+ ```
+ {
+ "role": "FOLLOWER",
+ "hostPort": "127.0.0.1:9030"
+ }
+ ```
+
+ Response
+ ```
+ {
+ "msg": "success",
+ "code": 0,
+ "data": null,
+ "count": 0
+ }
+ ```
+
+2. drop FOLLOWER node
+
+ post /rest/v2/manager/node/DROP/fe
+ Request body
+ ```
+ {
+ "role": "FOLLOWER",
+ "hostPort": "127.0.0.1:9030"
+ }
+ ```
+
+ Response
+ ```
+ {
+ "msg": "success",
+ "code": 0,
+ "data": null,
+ "count": 0
+ }
```
\ No newline at end of file
diff --git
a/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/node-action.md
b/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/node-action.md
index 2eb2b7d41d..9960ad1551 100644
--- a/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/node-action.md
+++ b/docs/zh-CN/docs/admin-manual/http-actions/fe/manager/node-action.md
@@ -44,6 +44,14 @@ under the License.
`POST /rest/v2/manager/node/set_config/be`
+<version since="dev">
+
+`POST /rest/v2/manager/node/{action}/be`
+
+`POST /rest/v2/manager/node/{action}/fe`
+
+</version>
+
## 获取fe, be, broker节点信息
`GET /rest/v2/manager/node/frontends`
@@ -432,4 +440,182 @@ failed 表示修改失败的配置信息。
}
agent_task_resend_wait_time_ms 配置值修改成功,alter_table_timeout_second 修改失败。
+ ```
+
+## 操作 be 节点
+
+`POST /rest/v2/manager/node/{action}/be`
+
+### Description
+
+用于添加/删除/下线 be 节点
+
+action:ADD/DROP/DECOMMISSION
+
+### Request body
+```
+{
+ "hostPorts": ["127.0.0.1:9050"],
+ "properties": {
+ "tag.location": "test"
+ }
+}
+
+hostPorts 需要操作的一组 be 节点地址 ip:heartbeat_port
+properties 添加节点时传入的配置,目前只用于配置 tag, 不传使用默认 tag
+```
+
+### Response
+```
+{
+ "msg": "Error",
+ "code": 1,
+ "data": "errCode = 2, detailMessage = Same backend already
exists[127.0.0.1:9050]",
+ "count": 0
+}
+
+msg Success/Error
+code 0/1
+data ""/报错信息
+```
+
+### Examples
+
+1. 添加 be 节点
+
+ post /rest/v2/manager/node/ADD/be
+ Request body
+ ```
+ {
+ "hostPorts": ["127.0.0.1:9050"]
+ }
+ ```
+
+ Response
+ ```
+ {
+ "msg": "success",
+ "code": 0,
+ "data": null,
+ "count": 0
+ }
+ ```
+
+2. 删除 be 节点
+
+ post /rest/v2/manager/node/DROP/be
+ Request body
+ ```
+ {
+ "hostPorts": ["127.0.0.1:9050"]
+ }
+ ```
+
+ Response
+ ```
+ {
+ "msg": "success",
+ "code": 0,
+ "data": null,
+ "count": 0
+ }
+ ```
+
+3. 下线 be 节点
+
+ post /rest/v2/manager/node/DECOMMISSION/be
+ Request body
+ ```
+ {
+ "hostPorts": ["127.0.0.1:9050"]
+ }
+ ```
+
+ Response
+ ```
+ {
+ "msg": "success",
+ "code": 0,
+ "data": null,
+ "count": 0
+ }
+ ```
+
+## 操作 fe 节点
+
+`POST /rest/v2/manager/node/{action}/fe`
+
+### Description
+
+用于添加/删除 fe 节点
+
+action:ADD/DROP
+
+### Request body
+```
+{
+ "role": "FOLLOWER",
+ "hostPort": "127.0.0.1:9030"
+}
+
+role FOLLOWER/OBSERVER
+hostPort 需要操作的 fe 节点地址 ip:edit_log_port
+```
+
+### Response
+```
+{
+ "msg": "Error",
+ "code": 1,
+ "data": "errCode = 2, detailMessage = frontend already exists name:
127.0.0.1:9030_1670495889415, role: FOLLOWER, 127.0.0.1:9030",
+ "count": 0
+}
+
+msg Success/Error
+code 0/1
+data ""/报错信息
+```
+
+### Examples
+
+1. 添加 FOLLOWER 节点
+
+ post /rest/v2/manager/node/ADD/fe
+ Request body
+ ```
+ {
+ "role": "FOLLOWER",
+ "hostPort": "127.0.0.1:9030"
+ }
+ ```
+
+ Response
+ ```
+ {
+ "msg": "success",
+ "code": 0,
+ "data": null,
+ "count": 0
+ }
+ ```
+
+2. 删除 FOLLOWER 节点
+
+ post /rest/v2/manager/node/DROP/fe
+ Request body
+ ```
+ {
+ "role": "FOLLOWER",
+ "hostPort": "127.0.0.1:9030"
+ }
+ ```
+
+ Response
+ ```
+ {
+ "msg": "success",
+ "code": 0,
+ "data": null,
+ "count": 0
+ }
```
\ No newline at end of file
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
index f294e7a050..b4c968f834 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java
@@ -21,20 +21,26 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.proc.ProcService;
+import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.rest.RestBaseController;
import org.apache.doris.httpv2.rest.SetConfigAction;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.system.SystemInfoService.HostInfo;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
@@ -44,17 +50,22 @@ import com.google.common.collect.Maps;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
+import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -85,15 +96,12 @@ public class NodeAction extends RestBaseController {
public static final String CONFIG_VALUE = "配置值";
public static final String IS_MUTABLE = "可修改";
- public static final ImmutableList<String> FE_CONFIG_TITLE_NAMES = new
ImmutableList.Builder<String>()
- .add(CONFIG).add(NODE_IP_PORT).add(NODE_TYPE).add(CONFIG_TYPE)
- .add(MASTER_ONLY).add(CONFIG_VALUE).add(IS_MUTABLE)
+ public static final ImmutableList<String> FE_CONFIG_TITLE_NAMES = new
ImmutableList.Builder<String>().add(CONFIG)
+
.add(NODE_IP_PORT).add(NODE_TYPE).add(CONFIG_TYPE).add(MASTER_ONLY).add(CONFIG_VALUE).add(IS_MUTABLE)
.build();
- public static final ImmutableList<String> BE_CONFIG_TITLE_NAMES = new
ImmutableList.Builder<String>()
- .add(CONFIG).add(NODE_IP_PORT).add(NODE_TYPE).add(CONFIG_TYPE)
- .add(CONFIG_VALUE).add(IS_MUTABLE)
- .build();
+ public static final ImmutableList<String> BE_CONFIG_TITLE_NAMES = new
ImmutableList.Builder<String>().add(CONFIG)
+
.add(NODE_IP_PORT).add(NODE_TYPE).add(CONFIG_TYPE).add(CONFIG_VALUE).add(IS_MUTABLE).build();
private Object httpExecutorLock = new Object();
private static volatile ExecutorService httpExecutor = null;
@@ -187,9 +195,8 @@ public class NodeAction extends RestBaseController {
Backend be =
Env.getCurrentSystemInfo().getBackend(beIds.get(0));
String url = "http://" + be.getHost() + ":" + be.getHttpPort()
+ "/api/show_config";
String questResult = HttpUtils.doGet(url, null);
- List<List<String>> configs =
GsonUtils.GSON.fromJson(questResult,
- new TypeToken<List<List<String>>>() {
- }.getType());
+ List<List<String>> configs =
GsonUtils.GSON.fromJson(questResult, new TypeToken<List<List<String>>>() {
+ }.getType());
for (List<String> config : configs) {
beConfigNames.add(config.get(0));
}
@@ -223,19 +230,15 @@ public class NodeAction extends RestBaseController {
}
private static List<String> getFeList() {
- return Env.getCurrentEnv().getFrontends(null)
- .stream()
- .map(fe -> fe.getHost() + ":" + Config.http_port)
+ return Env.getCurrentEnv().getFrontends(null).stream().map(fe ->
fe.getHost() + ":" + Config.http_port)
.collect(Collectors.toList());
}
private static List<String> getBeList() {
- return Env.getCurrentSystemInfo().getBackendIds(false)
- .stream().map(beId -> {
- Backend be = Env.getCurrentSystemInfo().getBackend(beId);
- return be.getHost() + ":" + be.getHttpPort();
- })
- .collect(Collectors.toList());
+ return
Env.getCurrentSystemInfo().getBackendIds(false).stream().map(beId -> {
+ Backend be = Env.getCurrentSystemInfo().getBackend(beId);
+ return be.getHost() + ":" + be.getHttpPort();
+ }).collect(Collectors.toList());
}
/*
@@ -302,8 +305,8 @@ public class NodeAction extends RestBaseController {
// }
@RequestMapping(path = "/configuration_info", method = RequestMethod.POST)
public Object configurationInfo(HttpServletRequest request,
HttpServletResponse response,
- @RequestParam(value = "type") String type,
- @RequestBody(required = false)
ConfigInfoRequestBody requestBody) {
+ @RequestParam(value = "type") String type,
+ @RequestBody(required = false) ConfigInfoRequestBody requestBody) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(),
PrivPredicate.ADMIN);
@@ -342,15 +345,13 @@ public class NodeAction extends RestBaseController {
}
return ResponseEntityBuilder.ok(new
NodeInfo(BE_CONFIG_TITLE_NAMES, data));
}
- return ResponseEntityBuilder.badRequest("Unsupported type: " + type +
". Only types of fe or be are "
- + "supported");
+ return ResponseEntityBuilder.badRequest(
+ "Unsupported type: " + type + ". Only types of fe or be are "
+ "supported");
}
// Use thread pool to concurrently fetch configuration information from
specified fe or be nodes.
- private List<List<String>> handleConfigurationInfo(List<Pair<String,
Integer>> hostPorts,
- String authorization,
String questPath,
- String nodeType,
List<String> confNames,
- List<Map.Entry<String,
Integer>> errNodes) {
+ private List<List<String>> handleConfigurationInfo(List<Pair<String,
Integer>> hostPorts, String authorization,
+ String questPath, String nodeType, List<String> confNames,
List<Map.Entry<String, Integer>> errNodes) {
// The configuration information returned by each node is a
List<List<String>> type,
// configInfoTotal is used to store the configuration information of
all nodes.
List<List<List<String>>> configInfoTotal = Lists.newArrayList();
@@ -361,8 +362,9 @@ public class NodeAction extends RestBaseController {
Pair<String, Integer> hostPort = hostPorts.get(i);
configRequestDoneSignal.addMark(hostPort.first + ":" +
hostPort.second, -1);
String url = "http://" + hostPort.first + ":" + hostPort.second +
questPath;
- httpExecutor.submit(new HttpConfigInfoTask(url, hostPort,
authorization, nodeType, confNames,
- configRequestDoneSignal, configInfoTotal.get(i)));
+ httpExecutor.submit(
+ new HttpConfigInfoTask(url, hostPort, authorization,
nodeType, confNames, configRequestDoneSignal,
+ configInfoTotal.get(i)));
}
List<List<String>> resultConfigs = Lists.newArrayList();
try {
@@ -381,8 +383,7 @@ public class NodeAction extends RestBaseController {
if (httpExecutor == null) {
synchronized (httpExecutorLock) {
if (httpExecutor == null) {
- httpExecutor =
ThreadPoolManager.newDaemonFixedThreadPool(5, 100,
- "node-config-update-pool", true);
+ httpExecutor =
ThreadPoolManager.newDaemonFixedThreadPool(5, 100, "node-config-update-pool",
true);
}
}
}
@@ -411,9 +412,8 @@ public class NodeAction extends RestBaseController {
private List<List<String>> config;
public HttpConfigInfoTask(String url, Pair<String, Integer> hostPort,
String authorization, String nodeType,
- List<String> confNames,
- MarkedCountDownLatch<String, Integer>
configRequestDoneSignal,
- List<List<String>> config) {
+ List<String> confNames, MarkedCountDownLatch<String, Integer>
configRequestDoneSignal,
+ List<List<String>> config) {
this.url = url;
this.hostPort = hostPort;
this.authorization = authorization;
@@ -427,11 +427,10 @@ public class NodeAction extends RestBaseController {
public void run() {
String configInfo;
try {
- configInfo = HttpUtils.doGet(url, ImmutableMap.<String,
String>builder().put(AUTHORIZATION,
- authorization).build());
- List<List<String>> configs =
GsonUtils.GSON.fromJson(configInfo,
- new TypeToken<List<List<String>>>() {
- }.getType());
+ configInfo = HttpUtils.doGet(url,
+ ImmutableMap.<String,
String>builder().put(AUTHORIZATION, authorization).build());
+ List<List<String>> configs =
GsonUtils.GSON.fromJson(configInfo, new TypeToken<List<List<String>>>() {
+ }.getType());
for (List<String> conf : configs) {
if (confNames == null || confNames.isEmpty() ||
confNames.contains(conf.get(0))) {
addConfig(conf);
@@ -477,15 +476,14 @@ public class NodeAction extends RestBaseController {
// }
@RequestMapping(path = "/set_config/fe", method = RequestMethod.POST)
public Object setConfigFe(HttpServletRequest request, HttpServletResponse
response,
- @RequestBody Map<String, SetConfigRequestBody>
requestBody) {
+ @RequestBody Map<String, SetConfigRequestBody> requestBody) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(),
PrivPredicate.ADMIN);
List<Map<String, String>> failedTotal = Lists.newArrayList();
List<NodeConfigs> nodeConfigList = parseSetConfigNodes(requestBody,
failedTotal);
- List<Pair<String, Integer>> aliveFe =
Env.getCurrentEnv().getFrontends(null)
- .stream().filter(Frontend::isAlive).map(fe ->
Pair.of(fe.getHost(), Config.http_port))
- .collect(Collectors.toList());
+ List<Pair<String, Integer>> aliveFe =
Env.getCurrentEnv().getFrontends(null).stream().filter(Frontend::isAlive)
+ .map(fe -> Pair.of(fe.getHost(),
Config.http_port)).collect(Collectors.toList());
checkNodeIsAlive(nodeConfigList, aliveFe, failedTotal);
Map<String, String> header = Maps.newHashMap();
@@ -498,8 +496,8 @@ public class NodeAction extends RestBaseController {
String responsePersist = HttpUtils.doGet(url, header);
parseFeSetConfigResponse(responsePersist,
nodeConfigs.getHostPort(), failedTotal);
} catch (Exception e) {
- addSetConfigErrNode(nodeConfigs.getConfigs(true),
nodeConfigs.getHostPort(),
- e.getMessage(), failedTotal);
+ addSetConfigErrNode(nodeConfigs.getConfigs(true),
nodeConfigs.getHostPort(), e.getMessage(),
+ failedTotal);
}
}
if (!nodeConfigs.getConfigs(false).isEmpty()) {
@@ -508,8 +506,8 @@ public class NodeAction extends RestBaseController {
String responseTemp = HttpUtils.doGet(url, header);
parseFeSetConfigResponse(responseTemp,
nodeConfigs.getHostPort(), failedTotal);
} catch (Exception e) {
- addSetConfigErrNode(nodeConfigs.getConfigs(false),
nodeConfigs.getHostPort(),
- e.getMessage(), failedTotal);
+ addSetConfigErrNode(nodeConfigs.getConfigs(false),
nodeConfigs.getHostPort(), e.getMessage(),
+ failedTotal);
}
}
@@ -520,17 +518,16 @@ public class NodeAction extends RestBaseController {
}
private void addSetConfigErrNode(Map<String, String> configs, Pair<String,
Integer> hostPort, String err,
- List<Map<String, String>> failedTotal) {
+ List<Map<String, String>> failedTotal) {
for (Map.Entry<String, String> entry : configs.entrySet()) {
Map<String, String> failed = Maps.newHashMap();
- addFailedConfig(entry.getKey(), entry.getValue(), hostPort.first +
":"
- + hostPort.second, err, failed);
+ addFailedConfig(entry.getKey(), entry.getValue(), hostPort.first +
":" + hostPort.second, err, failed);
failedTotal.add(failed);
}
}
private void parseFeSetConfigResponse(String response, Pair<String,
Integer> hostPort,
- List<Map<String, String>>
failedTotal) throws Exception {
+ List<Map<String, String>> failedTotal) throws Exception {
JsonObject jsonObject =
JsonParser.parseString(response).getAsJsonObject();
if (jsonObject.get("code").getAsInt() !=
HttpUtils.REQUEST_SUCCESS_CODE) {
throw new Exception(jsonObject.get("msg").getAsString());
@@ -546,7 +543,7 @@ public class NodeAction extends RestBaseController {
}
private static void addFailedConfig(String configName, String value,
String node, String errInfo,
- Map<String, String> failed) {
+ Map<String, String> failed) {
failed.put("config_name", configName);
failed.put("value", value);
failed.put("node", node);
@@ -578,18 +575,16 @@ public class NodeAction extends RestBaseController {
// The request body and return data are in the same format as fe
@RequestMapping(path = "/set_config/be", method = RequestMethod.POST)
public Object setConfigBe(HttpServletRequest request, HttpServletResponse
response,
- @RequestBody Map<String, SetConfigRequestBody>
requestBody) {
+ @RequestBody Map<String, SetConfigRequestBody> requestBody) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(),
PrivPredicate.ADMIN);
List<Map<String, String>> failedTotal = Lists.newArrayList();
List<NodeConfigs> nodeConfigList = parseSetConfigNodes(requestBody,
failedTotal);
- List<Pair<String, Integer>> aliveBe =
Env.getCurrentSystemInfo().getBackendIds(true)
- .stream().map(beId -> {
- Backend be = Env.getCurrentSystemInfo().getBackend(beId);
- return Pair.of(be.getHost(), be.getHttpPort());
- })
- .collect(Collectors.toList());
+ List<Pair<String, Integer>> aliveBe =
Env.getCurrentSystemInfo().getBackendIds(true).stream().map(beId -> {
+ Backend be = Env.getCurrentSystemInfo().getBackend(beId);
+ return Pair.of(be.getHost(), be.getHttpPort());
+ }).collect(Collectors.toList());
checkNodeIsAlive(nodeConfigList, aliveBe, failedTotal);
handleBeSetConfig(nodeConfigList, request.getHeader(AUTHORIZATION),
failedTotal);
@@ -600,9 +595,97 @@ public class NodeAction extends RestBaseController {
return ResponseEntityBuilder.ok(data);
}
+ @PostMapping("/{action}/be")
+ public Object operateBackend(HttpServletRequest request,
HttpServletResponse response, @PathVariable String action,
+ @RequestBody BackendReqInfo reqInfo) {
+ if (!Env.getCurrentEnv().isMaster()) {
+ return redirectToMaster(request, response);
+ }
+ try {
+ List<String> hostPorts = reqInfo.getHostPorts();
+ List<HostInfo> hostInfos = new ArrayList<>();
+ for (String hostPort : hostPorts) {
+ hostInfos.add(SystemInfoService.getIpHostAndPort(hostPort,
true));
+ }
+ SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo();
+ if ("ADD".equals(action)) {
+ Map<String, String> properties;
+ if (reqInfo.getProperties() == null) {
+ properties = new HashMap<>();
+ } else {
+ properties = reqInfo.getProperties();
+ }
+ Map<String, String> tagMap =
PropertyAnalyzer.analyzeBackendTagsProperties(properties,
+ Tag.DEFAULT_BACKEND_TAG);
+ currentSystemInfo.addBackends(hostInfos, false, "", tagMap);
+ } else if ("DROP".equals(action)) {
+ currentSystemInfo.dropBackends(hostInfos);
+ } else if ("DECOMMISSION".equals(action)) {
+ ImmutableMap<Long, Backend> backendsInCluster =
currentSystemInfo.getBackendsInCluster(
+ SystemInfoService.DEFAULT_CLUSTER);
+ backendsInCluster.forEach((k, v) -> {
+ hostInfos.stream()
+ .filter(h ->
v.getHostName().equals(h.getHostName()) && v.getHeartbeatPort() == h.getPort())
+ .findFirst().ifPresent(h -> {
+ v.setDecommissioned(true);
+
Env.getCurrentEnv().getEditLog().logBackendStateChange(v);
+ });
+ });
+ }
+ } catch (UserException userException) {
+ return
ResponseEntityBuilder.okWithCommonError(userException.getMessage());
+ }
+ return ResponseEntityBuilder.ok();
+ }
+
+ @PostMapping("/{action}/fe")
+ public Object operateFrontends(HttpServletRequest request,
HttpServletResponse response,
+ @PathVariable String action, @RequestBody FrontendReqInfo reqInfo)
{
+ if (!Env.getCurrentEnv().isMaster()) {
+ return redirectToMaster(request, response);
+ }
+ try {
+ String role = reqInfo.getRole();
+ String[] split = reqInfo.getHostPort().split(":");
+ String host = split[0];
+ int port = Integer.parseInt(split[1]);
+ Env currentEnv = Env.getCurrentEnv();
+ FrontendNodeType frontendNodeType;
+ if (FrontendNodeType.FOLLOWER.name().equals(role)) {
+ frontendNodeType = FrontendNodeType.FOLLOWER;
+ } else {
+ frontendNodeType = FrontendNodeType.OBSERVER;
+ }
+ if ("ADD".equals(action)) {
+ currentEnv.addFrontend(frontendNodeType, host, port);
+ } else if ("DROP".equals(action)) {
+ currentEnv.dropFrontend(frontendNodeType, host, port);
+ }
+ } catch (DdlException ddlException) {
+ return
ResponseEntityBuilder.okWithCommonError(ddlException.getMessage());
+ }
+ return ResponseEntityBuilder.ok();
+ }
+
+ @Data
+ private static class BackendReqInfo {
+
+ private List<String> hostPorts;
+
+ private Map<String, String> properties;
+ }
+
+ @Data
+ private static class FrontendReqInfo {
+
+ private String role;
+
+ private String hostPort;
+ }
+
// Parsing request body into List<NodeConfigs>
private List<NodeConfigs> parseSetConfigNodes(Map<String,
SetConfigRequestBody> requestBody,
- List<Map<String, String>>
errNodes) {
+ List<Map<String, String>> errNodes) {
List<NodeConfigs> nodeConfigsList = Lists.newArrayList();
for (String configName : requestBody.keySet()) {
SetConfigRequestBody configPara = requestBody.get(configName);
@@ -641,34 +724,34 @@ public class NodeAction extends RestBaseController {
}
private void checkNodeIsAlive(List<NodeConfigs> nodeConfigsList,
List<Pair<String, Integer>> aliveNodes,
- List<Map<String, String>> failedNodes) {
+ List<Map<String, String>> failedNodes) {
Iterator<NodeConfigs> it = nodeConfigsList.iterator();
while (it.hasNext()) {
NodeConfigs node = it.next();
boolean isExist = false;
for (Pair<String, Integer> aliveHostPort : aliveNodes) {
- if (aliveHostPort.first.equals(node.getHostPort().first)
- &&
aliveHostPort.second.equals(node.getHostPort().second)) {
+ if (aliveHostPort.first.equals(node.getHostPort().first) &&
aliveHostPort.second.equals(
+ node.getHostPort().second)) {
isExist = true;
break;
}
}
if (!isExist) {
- addSetConfigErrNode(node.getConfigs(true), node.getHostPort(),
- "Node does not exist or is not alive", failedNodes);
- addSetConfigErrNode(node.getConfigs(false), node.getHostPort(),
- "Node does not exist or is not alive", failedNodes);
+ addSetConfigErrNode(node.getConfigs(true), node.getHostPort(),
"Node does not exist or is not alive",
+ failedNodes);
+ addSetConfigErrNode(node.getConfigs(false),
node.getHostPort(), "Node does not exist or is not alive",
+ failedNodes);
it.remove();
}
}
}
private List<Map<String, String>> handleBeSetConfig(List<NodeConfigs>
nodeConfigList, String authorization,
- List<Map<String,
String>> failedTotal) {
+ List<Map<String, String>> failedTotal) {
initHttpExecutor();
- int configNum = nodeConfigList.stream()
- .mapToInt(e -> e.getConfigs(true).size() +
e.getConfigs(false).size()).sum();
+ int configNum = nodeConfigList.stream().mapToInt(e ->
e.getConfigs(true).size() + e.getConfigs(false).size())
+ .sum();
MarkedCountDownLatch<String, Integer> beSetConfigCountDownSignal = new
MarkedCountDownLatch<>(configNum);
for (NodeConfigs nodeConfigs : nodeConfigList) {
submitBeSetConfigTask(nodeConfigs, true, authorization,
beSetConfigCountDownSignal, failedTotal);
@@ -692,30 +775,28 @@ public class NodeAction extends RestBaseController {
}
private void submitBeSetConfigTask(NodeConfigs nodeConfigs, boolean
isPersist, String authorization,
- MarkedCountDownLatch<String, Integer>
beSetConfigCountDownSignal,
- List<Map<String, String>> failedTotal) {
+ MarkedCountDownLatch<String, Integer> beSetConfigCountDownSignal,
List<Map<String, String>> failedTotal) {
if (!nodeConfigs.getConfigs(isPersist).isEmpty()) {
for (Map.Entry<String, String> entry :
nodeConfigs.getConfigs(isPersist).entrySet()) {
failedTotal.add(Maps.newHashMap());
Pair<String, Integer> hostPort = nodeConfigs.getHostPort();
-
beSetConfigCountDownSignal.addMark(concatNodeConfig(hostPort.first,
hostPort.second,
- entry.getKey(), entry.getValue()), -1);
-
- String url = concatBeSetConfigUrl(hostPort.first,
hostPort.second, entry.getKey(),
- entry.getValue(), isPersist);
- httpExecutor.submit(new HttpSetConfigTask(url, hostPort,
authorization, entry.getKey(),
- entry.getValue(), beSetConfigCountDownSignal,
- failedTotal.get(failedTotal.size() - 1)));
+ beSetConfigCountDownSignal.addMark(
+ concatNodeConfig(hostPort.first, hostPort.second,
entry.getKey(), entry.getValue()), -1);
+
+ String url = concatBeSetConfigUrl(hostPort.first,
hostPort.second, entry.getKey(), entry.getValue(),
+ isPersist);
+ httpExecutor.submit(
+ new HttpSetConfigTask(url, hostPort, authorization,
entry.getKey(), entry.getValue(),
+ beSetConfigCountDownSignal,
failedTotal.get(failedTotal.size() - 1)));
}
}
}
- private String concatBeSetConfigUrl(String host, Integer port, String
configName,
- String configValue, boolean isPersist)
{
+ private String concatBeSetConfigUrl(String host, Integer port, String
configName, String configValue,
+ boolean isPersist) {
StringBuilder stringBuffer = new StringBuilder();
- stringBuffer.append("http://").append(host).append(":").append(port)
- .append("/api/update_config")
-
.append("?").append(configName).append("=").append(configValue);
+
stringBuffer.append("http://").append(host).append(":").append(port).append("/api/update_config").append("?")
+ .append(configName).append("=").append(configValue);
if (isPersist) {
stringBuffer.append("&persist=true");
}
@@ -745,8 +826,8 @@ public class NodeAction extends RestBaseController {
private Map<String, String> failed;
public HttpSetConfigTask(String url, Pair<String, Integer> hostPort,
String authorization, String configName,
- String configValue,
MarkedCountDownLatch<String, Integer> beSetConfigDoneSignal,
- Map<String, String> failed) {
+ String configValue, MarkedCountDownLatch<String, Integer>
beSetConfigDoneSignal,
+ Map<String, String> failed) {
this.url = url;
this.hostPort = hostPort;
this.authorization = authorization;
@@ -759,16 +840,16 @@ public class NodeAction extends RestBaseController {
@Override
public void run() {
try {
- String response = HttpUtils.doPost(url, ImmutableMap.<String,
String>builder().put(AUTHORIZATION,
- authorization).build(), null);
+ String response = HttpUtils.doPost(url,
+ ImmutableMap.<String,
String>builder().put(AUTHORIZATION, authorization).build(), null);
JsonObject jsonObject =
JsonParser.parseString(response).getAsJsonObject();
String status = jsonObject.get("status").getAsString();
if (!status.equals("OK")) {
addFailedConfig(configName, configValue, hostPort.first +
":" + hostPort.second,
jsonObject.get("msg").getAsString(), failed);
}
-
beSetConfigDoneSignal.markedCountDown(concatNodeConfig(hostPort.first,
hostPort.second,
- configName, configValue), -1);
+ beSetConfigDoneSignal.markedCountDown(
+ concatNodeConfig(hostPort.first, hostPort.second,
configName, configValue), -1);
} catch (Exception e) {
LOG.warn("set be:{} config:{} failed.", hostPort.first + ":" +
hostPort.second,
configName + "=" + configValue, e);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]