This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 710d129e828 branch-3.1: [fix](cloud)Fix modify the cluster public and
private network causing the node to be temporarily offline #52294 (#52656)
710d129e828 is described below
commit 710d129e8281562b40848b703d8863295c2b85da
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jul 2 18:08:44 2025 +0800
branch-3.1: [fix](cloud)Fix modify the cluster public and private network
causing the node to be temporarily offline #52294 (#52656)
Cherry-picked from #52294
Co-authored-by: deardeng <[email protected]>
---
.../doris/cloud/catalog/CloudClusterChecker.java | 42 ++++++---
.../doris/regression/suite/SuiteCluster.groovy | 1 +
.../multi_cluster/test_change_node_net.groovy | 102 +++++++++++++++++++++
3 files changed, 134 insertions(+), 11 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index 5d404cdc8a3..ca81b165cb9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -155,7 +155,7 @@ public class CloudClusterChecker extends MasterDaemon {
);
}
- private void updateStatus(List<Backend> currentBes, List<Cloud.NodeInfoPB>
expectedBes) {
+ private void updateStatus(List<Backend> currentBes, List<Cloud.NodeInfoPB>
expectedBes, ClusterPB remoteClusterPb) {
Map<String, Backend> currentMap = new HashMap<>();
for (Backend be : currentBes) {
String endpoint = be.getHost() + ":" + be.getHeartbeatPort();
@@ -200,6 +200,33 @@ public class CloudClusterChecker extends MasterDaemon {
// edit log
Env.getCurrentEnv().getEditLog().logBackendStateChange(be);
}
+ updateIfComputeNodeEndpointChanged(remoteClusterPb, be);
+ }
+ }
+
+ private void updateIfComputeNodeEndpointChanged(ClusterPB remoteClusterPb,
Backend be) {
+ // check PublicEndpoint、PrivateEndpoint is changed?
+ boolean netChanged = false;
+ String remotePublicEndpoint = remoteClusterPb.getPublicEndpoint();
+ String localPublicEndpoint =
be.getTagMap().get(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT);
+ if (!localPublicEndpoint.equals(remotePublicEndpoint)) {
+ LOG.info("be {} has changed public_endpoint from {} to {}",
+ be, localPublicEndpoint, remotePublicEndpoint);
+ be.getTagMap().put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT,
remotePublicEndpoint);
+ netChanged = true;
+ }
+
+ String remotePrivateEndpoint = remoteClusterPb.getPrivateEndpoint();
+ String localPrivateEndpoint =
be.getTagMap().get(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT);
+ if (!localPrivateEndpoint.equals(remotePrivateEndpoint)) {
+ LOG.info("be {} has changed private_endpoint from {} to {}",
+ be, localPrivateEndpoint, remotePrivateEndpoint);
+ be.getTagMap().put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT,
remotePrivateEndpoint);
+ netChanged = true;
+ }
+ if (netChanged) {
+ // edit log
+ Env.getCurrentEnv().getEditLog().logBackendStateChange(be);
}
}
@@ -278,13 +305,12 @@ public class CloudClusterChecker extends MasterDaemon {
LOG.info("get cloud cluster, clusterId={} local nodes={} remote
nodes={}", cid,
currentBeEndpoints, remoteBeEndpoints);
- updateStatus(currentBes, expectedBes);
+ updateStatus(currentBes, expectedBes,
remoteClusterIdToPB.get(cid));
diffNodes(toAdd, toDel, () -> {
Map<String, Backend> currentMap = new HashMap<>();
for (Backend be : currentBes) {
- String endpoint = be.getHost() + ":" +
be.getHeartbeatPort()
- + be.getCloudPublicEndpoint() +
be.getCloudPrivateEndpoint();
+ String endpoint = be.getHost() + ":" +
be.getHeartbeatPort();
currentMap.put(endpoint, be);
}
return currentMap;
@@ -296,9 +322,7 @@ public class CloudClusterChecker extends MasterDaemon {
LOG.warn("cant get valid add from ms {}", node);
continue;
}
- String endpoint = host + ":" + node.getHeartbeatPort()
- + remoteClusterIdToPB.get(cid).getPublicEndpoint()
- +
remoteClusterIdToPB.get(cid).getPrivateEndpoint();
+ String endpoint = host + ":" + node.getHeartbeatPort();
Backend b = new Backend(Env.getCurrentEnv().getNextId(),
host, node.getHeartbeatPort());
if (node.hasIsSmoothUpgrade()) {
b.setSmoothUpgradeDst(node.getIsSmoothUpgrade());
@@ -463,10 +487,6 @@ public class CloudClusterChecker extends MasterDaemon {
continue;
}
Cloud.NodeInfoPB.NodeType type = node.getNodeType();
- // ATTN: just allow to add follower or observer
- if (Cloud.NodeInfoPB.NodeType.FE_MASTER.equals(type)) {
- LOG.warn("impossible !!!, get fe node {} type equal
master from ms", node);
- }
FrontendNodeType role = type ==
Cloud.NodeInfoPB.NodeType.FE_OBSERVER
? FrontendNodeType.OBSERVER :
FrontendNodeType.FOLLOWER;
Frontend fe = new Frontend(role,
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index ec801c47c01..9b948a3c303 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -59,6 +59,7 @@ class ClusterOptions {
'max_sys_mem_available_low_water_mark_bytes=0', //no check mem
available memory
'report_disk_state_interval_seconds=2',
'report_random_wait=false',
+ 'enable_java_support=false',
]
List<String> msConfigs = []
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_change_node_net.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_change_node_net.groovy
new file mode 100644
index 00000000000..3cc6991fa20
--- /dev/null
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_change_node_net.groovy
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+import groovy.json.JsonOutput
+
+suite('test_change_node_net', 'multi_cluster,docker') {
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=5',
+ ]
+ options.cloudMode = true
+
+ def token = "greedisgood9999"
+ def update_cluster_endpoint_api = { msHttpPort, request_body, check_func ->
+ httpTest {
+ endpoint msHttpPort
+ uri "/MetaService/http/update_cluster_endpoint?token=$token"
+ body request_body
+ check check_func
+ }
+ }
+
+ def showClusterBackends = { clusterName ->
+ def bes = sql_return_maparray "show backends"
+ def clusterBes = bes.findAll { be -> be.Tag.contains(clusterName) }
+ def backendMap = clusterBes.collectEntries { be ->
+ [(be.BackendId): be.Tag]
+ }
+ logger.info("Collected BackendId and Tag map: {}", backendMap)
+ backendMap
+ }
+
+ docker(options) {
+ def ms = cluster.getAllMetaservices().get(0)
+ def msHttpPort = ms.host + ":" + ms.httpPort
+ logger.info("ms addr={}, port={}, ms endpoint={}", ms.host,
ms.httpPort, msHttpPort)
+
+ def clusterName = "newcluster1"
+ // 添加一个新的cluster add_new_cluster
+ cluster.addBackend(3, clusterName)
+
+ def result = sql """show clusters"""
+ logger.info("show cluster1 : {}", result)
+
+ def beforeBackendMap = showClusterBackends.call(clusterName)
+
+ def tag = beforeBackendMap.entrySet().iterator().next().Value
+ assertNotNull(tag)
+ def jsonSlurper = new JsonSlurper()
+ def jsonObject = jsonSlurper.parseText(tag)
+ def cloudUniqueId = jsonObject.cloud_unique_id
+ def clusterId = jsonObject.compute_group_id
+ def before_public_endpoint = jsonObject.public_endpoint
+ def after_private_endpoint = jsonObject.private_endpoint
+
+
+ def changeCluster = [cluster_id: "${clusterId}", public_endpoint:
"test_public_endpoint", private_endpoint: "test_private_endpoint"]
+ def updateClusterEndpointBody = [cloud_unique_id: "${cloudUniqueId}",
cluster: changeCluster]
+ def jsonOutput = new JsonOutput()
+ def updateClusterEndpointJson =
jsonOutput.toJson(updateClusterEndpointBody)
+
+ update_cluster_endpoint_api.call(msHttpPort,
updateClusterEndpointJson) {
+ respCode, body ->
+ def json = parseJson(body)
+ log.info("update cluster endpoint result: ${body} ${respCode}
${json}".toString())
+ }
+
+ def futrue = thread {
+ // check 15s
+ for (def i = 0; i < 15; i++) {
+ def afterBackendMap = showClusterBackends.call(clusterName)
+ if (i > 5) {
+ // cloud_cluster_check_interval_second = 5
+ afterBackendMap.each { key, value ->
+ assert value.contains("test_public_endpoint") : "Value
for key ${key} does not contain 'test_public_endpoint'"
+ assert value.contains("test_private_endpoint") :
"Value for key ${key} does not contain 'test_private_endpoint'"
+ }
+ }
+ // check beid not changed
+ assertEquals(afterBackendMap.keySet(),
beforeBackendMap.keySet())
+ sleep(1 * 1000)
+ }
+ }
+ futrue.get()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]