This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4b36a6907a7 Refactor ComputeNodeInstance.labels as thread safe (#31310)
4b36a6907a7 is described below
commit 4b36a6907a7c449b9c05c1905fe15a3af31b5d6a
Author: Liang Zhang <[email protected]>
AuthorDate: Mon May 20 16:14:16 2024 +0800
Refactor ComputeNodeInstance.labels as thread safe (#31310)
* Refactor ComputeNodeInstance.labels as thread safe
* Refactor ComputeNodeInstance.labels as thread safe
* Refactor ComputeNodeInstance.labels as thread safe
* Refactor ComputeNodeInstance.labels as thread safe
---
.../infra/instance/ComputeNodeInstance.java | 19 +++++--------------
.../infra/instance/InstanceContext.java | 10 ++++++++--
.../infra/instance/InstanceContextTest.java | 4 +---
.../manager/cluster/ClusterContextManagerBuilder.java | 4 +++-
.../compute/service/ComputeNodeStatusService.java | 6 ++----
.../compute/service/ComputeNodeStatusServiceTest.java | 2 +-
.../subscriber/StateChangedSubscriberTest.java | 2 +-
7 files changed, 21 insertions(+), 26 deletions(-)
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 6c16f75b64c..5e2d7796f0d 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -21,39 +21,30 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
-import org.apache.shardingsphere.infra.state.instance.InstanceStateContext;
import org.apache.shardingsphere.infra.state.instance.InstanceState;
+import org.apache.shardingsphere.infra.state.instance.InstanceStateContext;
+import javax.annotation.concurrent.ThreadSafe;
import java.util.Collection;
-import java.util.LinkedList;
+import java.util.concurrent.CopyOnWriteArrayList;
/**
* Instance of compute node.
*/
@RequiredArgsConstructor
@Getter
+@ThreadSafe
public final class ComputeNodeInstance {
private final InstanceMetaData metaData;
private final InstanceStateContext state = new InstanceStateContext();
- private Collection<String> labels = new LinkedList<>();
+ private final Collection<String> labels = new CopyOnWriteArrayList<>();
@Setter
private volatile int workerId = -1;
- /**
- * Set labels.
- *
- * @param labels labels
- */
- public void setLabels(final Collection<String> labels) {
- if (null != labels) {
- this.labels = labels;
- }
- }
-
/**
* Switch state.
*
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 1c989927e80..ea7201190b0 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -100,9 +100,15 @@ public final class InstanceContext {
*/
public void updateLabel(final String instanceId, final Collection<String>
labels) {
if (instance.getMetaData().getId().equals(instanceId)) {
- instance.setLabels(labels);
+ instance.getLabels().clear();
+ instance.getLabels().addAll(labels);
+ }
+ for (ComputeNodeInstance each : allClusterComputeNodeInstances) {
+ if (each.getMetaData().getId().equals(instanceId)) {
+ each.getLabels().clear();
+ each.getLabels().addAll(labels);
+ }
}
- allClusterComputeNodeInstances.stream().filter(each ->
each.getMetaData().getId().equals(instanceId)).forEach(each ->
each.setLabels(labels));
}
/**
diff --git
a/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
index 8cb58569a79..f6555ed86d0 100644
---
a/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
+++
b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
@@ -28,9 +28,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collection;
-import java.util.LinkedHashSet;
import java.util.Properties;
-import java.util.Set;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -87,7 +85,7 @@ class InstanceContextTest {
when(instanceMetaData.getId()).thenReturn("foo_instance_id");
InstanceContext context = new InstanceContext(new
ComputeNodeInstance(instanceMetaData), new
WorkerIdGeneratorFixture(Integer.MIN_VALUE),
modeConfig, modeContextManager, lockContext, eventBusContext);
- Set<String> expected = new LinkedHashSet<>(Arrays.asList("label_1",
"label_2"));
+ Collection<String> expected = Arrays.asList("label_1", "label_2");
context.updateLabel("foo_instance_id", expected);
Collection<String> actual = context.getInstance().getLabels();
assertThat(actual, is(expected));
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index eeb1ad28c93..e4ae6c33181 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -117,7 +117,9 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
new
ComputeNodeStatusService(repository).registerOnline(instanceContext.getInstance());
new GovernanceWatcherFactory(repository,
eventBusContext, param.getInstanceMetaData() instanceof
JDBCInstanceMetaData ? param.getDatabaseConfigs().keySet() :
Collections.emptyList()).watchListeners();
-
contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
+ if (null != param.getLabels()) {
+
contextManager.getInstanceContext().getInstance().getLabels().addAll(param.getLabels());
+ }
contextManager.getInstanceContext().getAllClusterComputeNodeInstances().addAll(new
ComputeNodeStatusService(repository).loadAllComputeNodeInstances());
new ClusterEventSubscriberRegistry(contextManager,
repository).register();
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index d3c350eb8ba..b7bdfb18e83 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -65,9 +65,7 @@ public final class ComputeNodeStatusService {
* @param labels instance labels
*/
public void persistInstanceLabels(final String instanceId, final
Collection<String> labels) {
- if (null != labels) {
-
repository.persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId),
YamlEngine.marshal(labels));
- }
+
repository.persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId),
YamlEngine.marshal(labels));
}
/**
@@ -152,7 +150,7 @@ public final class ComputeNodeStatusService {
*/
public ComputeNodeInstance loadComputeNodeInstance(final InstanceMetaData
instanceMetaData) {
ComputeNodeInstance result = new ComputeNodeInstance(instanceMetaData);
- result.setLabels(loadInstanceLabels(instanceMetaData.getId()));
+
result.getLabels().addAll(loadInstanceLabels(instanceMetaData.getId()));
result.switchState(loadInstanceStatus(instanceMetaData.getId()));
loadInstanceWorkerId(instanceMetaData.getId()).ifPresent(result::setWorkerId);
return result;
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
index 8729e58e7fd..187338dac01 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -55,7 +55,7 @@ class ComputeNodeStatusServiceTest {
@Test
void assertRegisterOnline() {
ComputeNodeInstance computeNodeInstance = new ComputeNodeInstance(new
ProxyInstanceMetaData("foo_instance_id", 3307));
- computeNodeInstance.setLabels(Collections.singletonList("test"));
+ computeNodeInstance.getLabels().add("test");
new
ComputeNodeStatusService(repository).registerOnline(computeNodeInstance);
verify(repository).persistEphemeral(eq("/nodes/compute_nodes/online/proxy/" +
computeNodeInstance.getMetaData().getId()), anyString());
verify(repository).persistEphemeral(ComputeNode.getInstanceStatusNodePath(computeNodeInstance.getMetaData().getId()),
InstanceState.OK.name());
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index cb0ebc565b3..216b5e1d248 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -150,7 +150,7 @@ class StateChangedSubscriberTest {
@Test
void assertRenewInstanceLabels() {
- Collection<String> labels = Collections.singleton("test");
+ Collection<String> labels = Collections.singletonList("test");
subscriber.renew(new
LabelsEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(),
labels));
assertThat(contextManager.getInstanceContext().getInstance().getLabels(),
is(labels));
}