This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b36a6907a7 Refactor ComputeNodeInstance.labels as thread safe (#31310)
4b36a6907a7 is described below

commit 4b36a6907a7c449b9c05c1905fe15a3af31b5d6a
Author: Liang Zhang <[email protected]>
AuthorDate: Mon May 20 16:14:16 2024 +0800

    Refactor ComputeNodeInstance.labels as thread safe (#31310)
    
    * Refactor ComputeNodeInstance.labels as thread safe
    
    * Refactor ComputeNodeInstance.labels as thread safe
    
    * Refactor ComputeNodeInstance.labels as thread safe
    
    * Refactor ComputeNodeInstance.labels as thread safe
---
 .../infra/instance/ComputeNodeInstance.java           | 19 +++++--------------
 .../infra/instance/InstanceContext.java               | 10 ++++++++--
 .../infra/instance/InstanceContextTest.java           |  4 +---
 .../manager/cluster/ClusterContextManagerBuilder.java |  4 +++-
 .../compute/service/ComputeNodeStatusService.java     |  6 ++----
 .../compute/service/ComputeNodeStatusServiceTest.java |  2 +-
 .../subscriber/StateChangedSubscriberTest.java        |  2 +-
 7 files changed, 21 insertions(+), 26 deletions(-)

diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 6c16f75b64c..5e2d7796f0d 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -21,39 +21,30 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
-import org.apache.shardingsphere.infra.state.instance.InstanceStateContext;
 import org.apache.shardingsphere.infra.state.instance.InstanceState;
+import org.apache.shardingsphere.infra.state.instance.InstanceStateContext;
 
+import javax.annotation.concurrent.ThreadSafe;
 import java.util.Collection;
-import java.util.LinkedList;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * Instance of compute node.
  */
 @RequiredArgsConstructor
 @Getter
+@ThreadSafe
 public final class ComputeNodeInstance {
     
     private final InstanceMetaData metaData;
     
     private final InstanceStateContext state = new InstanceStateContext();
     
-    private Collection<String> labels = new LinkedList<>();
+    private final Collection<String> labels = new CopyOnWriteArrayList<>();
     
     @Setter
     private volatile int workerId = -1;
     
-    /**
-     * Set labels.
-     *
-     * @param labels labels
-     */
-    public void setLabels(final Collection<String> labels) {
-        if (null != labels) {
-            this.labels = labels;
-        }
-    }
-    
     /**
      * Switch state.
      *
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 1c989927e80..ea7201190b0 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -100,9 +100,15 @@ public final class InstanceContext {
      */
     public void updateLabel(final String instanceId, final Collection<String> 
labels) {
         if (instance.getMetaData().getId().equals(instanceId)) {
-            instance.setLabels(labels);
+            instance.getLabels().clear();
+            instance.getLabels().addAll(labels);
+        }
+        for (ComputeNodeInstance each : allClusterComputeNodeInstances) {
+            if (each.getMetaData().getId().equals(instanceId)) {
+                each.getLabels().clear();
+                each.getLabels().addAll(labels);
+            }
         }
-        allClusterComputeNodeInstances.stream().filter(each -> 
each.getMetaData().getId().equals(instanceId)).forEach(each -> 
each.setLabels(labels));
     }
     
     /**
diff --git 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
index 8cb58569a79..f6555ed86d0 100644
--- 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
+++ 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
@@ -28,9 +28,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.LinkedHashSet;
 import java.util.Properties;
-import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -87,7 +85,7 @@ class InstanceContextTest {
         when(instanceMetaData.getId()).thenReturn("foo_instance_id");
         InstanceContext context = new InstanceContext(new 
ComputeNodeInstance(instanceMetaData), new 
WorkerIdGeneratorFixture(Integer.MIN_VALUE),
                 modeConfig, modeContextManager, lockContext, eventBusContext);
-        Set<String> expected = new LinkedHashSet<>(Arrays.asList("label_1", 
"label_2"));
+        Collection<String> expected = Arrays.asList("label_1", "label_2");
         context.updateLabel("foo_instance_id", expected);
         Collection<String> actual = context.getInstance().getLabels();
         assertThat(actual, is(expected));
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index eeb1ad28c93..e4ae6c33181 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -117,7 +117,9 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
         new 
ComputeNodeStatusService(repository).registerOnline(instanceContext.getInstance());
         new GovernanceWatcherFactory(repository,
                 eventBusContext, param.getInstanceMetaData() instanceof 
JDBCInstanceMetaData ? param.getDatabaseConfigs().keySet() : 
Collections.emptyList()).watchListeners();
-        
contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
+        if (null != param.getLabels()) {
+            
contextManager.getInstanceContext().getInstance().getLabels().addAll(param.getLabels());
+        }
         
contextManager.getInstanceContext().getAllClusterComputeNodeInstances().addAll(new
 ComputeNodeStatusService(repository).loadAllComputeNodeInstances());
         new ClusterEventSubscriberRegistry(contextManager, 
repository).register();
     }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index d3c350eb8ba..b7bdfb18e83 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -65,9 +65,7 @@ public final class ComputeNodeStatusService {
      * @param labels instance labels
      */
     public void persistInstanceLabels(final String instanceId, final 
Collection<String> labels) {
-        if (null != labels) {
-            
repository.persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId), 
YamlEngine.marshal(labels));
-        }
+        
repository.persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId), 
YamlEngine.marshal(labels));
     }
     
     /**
@@ -152,7 +150,7 @@ public final class ComputeNodeStatusService {
      */
     public ComputeNodeInstance loadComputeNodeInstance(final InstanceMetaData 
instanceMetaData) {
         ComputeNodeInstance result = new ComputeNodeInstance(instanceMetaData);
-        result.setLabels(loadInstanceLabels(instanceMetaData.getId()));
+        
result.getLabels().addAll(loadInstanceLabels(instanceMetaData.getId()));
         result.switchState(loadInstanceStatus(instanceMetaData.getId()));
         
loadInstanceWorkerId(instanceMetaData.getId()).ifPresent(result::setWorkerId);
         return result;
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
index 8729e58e7fd..187338dac01 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -55,7 +55,7 @@ class ComputeNodeStatusServiceTest {
     @Test
     void assertRegisterOnline() {
         ComputeNodeInstance computeNodeInstance = new ComputeNodeInstance(new 
ProxyInstanceMetaData("foo_instance_id", 3307));
-        computeNodeInstance.setLabels(Collections.singletonList("test"));
+        computeNodeInstance.getLabels().add("test");
         new 
ComputeNodeStatusService(repository).registerOnline(computeNodeInstance);
         
verify(repository).persistEphemeral(eq("/nodes/compute_nodes/online/proxy/" + 
computeNodeInstance.getMetaData().getId()), anyString());
         
verify(repository).persistEphemeral(ComputeNode.getInstanceStatusNodePath(computeNodeInstance.getMetaData().getId()),
 InstanceState.OK.name());
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index cb0ebc565b3..216b5e1d248 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -150,7 +150,7 @@ class StateChangedSubscriberTest {
     
     @Test
     void assertRenewInstanceLabels() {
-        Collection<String> labels = Collections.singleton("test");
+        Collection<String> labels = Collections.singletonList("test");
         subscriber.renew(new 
LabelsEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(),
 labels));
         
assertThat(contextManager.getInstanceContext().getInstance().getLabels(), 
is(labels));
     }

Reply via email to