This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new edbcd78cce0 Refactor ClusterStatusService.loadClusterState() (#31248)
edbcd78cce0 is described below

commit edbcd78cce071b2c50730357d3f69b46a9593d89
Author: Liang Zhang <[email protected]>
AuthorDate: Thu May 16 19:04:42 2024 +0800

    Refactor ClusterStatusService.loadClusterState() (#31248)
    
    * Refactor ClusterStatusService.loadClusterState()
    
    * Refactor ClusterStatusService.loadClusterState()
    
    * Refactor ClusterStatusService.loadClusterState()
---
 .../shardingsphere/mode/manager/ContextManager.java |  9 +++------
 .../mode/manager/ContextManagerTest.java            |  2 +-
 .../cluster/ClusterContextManagerBuilder.java       | 14 ++++++++++----
 .../status/cluster/event/ClusterStateEvent.java     |  3 ++-
 .../cluster/service/ClusterStatusService.java       | 13 ++++++++-----
 .../cluster/watcher/ClusterStateChangedWatcher.java | 21 +++++++++++++++------
 .../subscriber/StateChangedSubscriber.java          |  2 +-
 .../cluster/service/ClusterStatusServiceTest.java   |  6 +++---
 .../watcher/ClusterStateChangedWatcherTest.java     |  6 +++---
 .../subscriber/StateChangedSubscriberTest.java      |  2 +-
 10 files changed, 47 insertions(+), 31 deletions(-)

diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index ebd67de7818..d4e06010307 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -276,13 +276,10 @@ public final class ContextManager implements 
AutoCloseable {
     /**
      * Update cluster state.
      * 
-     * @param status status
+     * @param clusterState cluster state
      */
-    public void updateClusterState(final String status) {
-        try {
-            clusterStateContext.switchState(ClusterState.valueOf(status));
-        } catch (final IllegalArgumentException ignore) {
-        }
+    public void updateClusterState(final ClusterState clusterState) {
+        clusterStateContext.switchState(clusterState);
     }
     
     @Override
diff --git 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
index 56c43061c4d..85ea02dfa70 100644
--- 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
+++ 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
@@ -266,7 +266,7 @@ class ContextManagerTest {
     
     @Test
     void assertUpdateClusterStatus() {
-        contextManager.updateClusterState("READ_ONLY");
+        contextManager.updateClusterState(ClusterState.READ_ONLY);
         assertThat(contextManager.getClusterStateContext().getCurrentState(), 
is(ClusterState.READ_ONLY));
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index f046d7bbcf4..b2e264eb9a5 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.infra.instance.InstanceContextAware;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.state.cluster.ClusterState;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
 import org.apache.shardingsphere.mode.lock.GlobalLockContext;
@@ -43,6 +44,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 
 import java.sql.SQLException;
+import java.util.Optional;
 
 /**
  * Cluster context manager builder.
@@ -63,6 +65,7 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
         setContextManagerAware(result);
         createSubscribers(eventBusContext, repository);
         registerOnline(registryCenter, param, result);
+        setClusterStatus(registryCenter, result);
         return result;
     }
     
@@ -93,15 +96,18 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
     
     private void registerOnline(final RegistryCenter registryCenter, final 
ContextManagerBuilderParameter param, final ContextManager contextManager) {
         
registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance());
-        loadClusterStatus(registryCenter, contextManager);
         
contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
         
contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
         new ClusterEventSubscriberRegistry(contextManager, 
registryCenter).register();
     }
     
-    private void loadClusterStatus(final RegistryCenter registryCenter, final 
ContextManager contextManager) {
-        
registryCenter.getClusterStatusService().persistClusterState(contextManager.getClusterStateContext().getCurrentState());
-        
contextManager.updateClusterState(registryCenter.getClusterStatusService().loadClusterStatus());
+    private void setClusterStatus(final RegistryCenter registryCenter, final 
ContextManager contextManager) {
+        Optional<ClusterState> clusterState = 
registryCenter.getClusterStatusService().load();
+        if (clusterState.isPresent()) {
+            contextManager.updateClusterState(clusterState.get());
+        } else {
+            registryCenter.getClusterStatusService().persist(ClusterState.OK);
+        }
     }
     
     @Override
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/event/ClusterStateEvent.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/event/ClusterStateEvent.java
index 5a69a8f3b05..1518da048a4 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/event/ClusterStateEvent.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/event/ClusterStateEvent.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
+import org.apache.shardingsphere.infra.state.cluster.ClusterState;
 
 /**
  * Cluster state event.
@@ -28,5 +29,5 @@ import 
org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
 @Getter
 public final class ClusterStateEvent implements GovernanceEvent {
     
-    private final String status;
+    private final ClusterState clusterState;
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusService.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusService.java
index 07ede1342aa..19a29f473be 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusService.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusService.java
@@ -23,6 +23,8 @@ import 
org.apache.shardingsphere.infra.state.cluster.ClusterState;
 import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
+import java.util.Optional;
+
 /**
  * Cluster status service.
  */
@@ -36,18 +38,19 @@ public final class ClusterStatusService {
      *
      * @param state cluster state
      */
-    public void persistClusterState(final ClusterState state) {
+    public void persist(final ClusterState state) {
         if 
(Strings.isNullOrEmpty(repository.getDirectly(ComputeNode.getClusterStatusNodePath())))
 {
             repository.persist(ComputeNode.getClusterStatusNodePath(), 
state.name());
         }
     }
     
     /**
-     * Load cluster status.
+     * Load cluster state.
      *
-     * @return cluster status
+     * @return cluster state
      */
-    public String loadClusterStatus() {
-        return repository.getDirectly(ComputeNode.getClusterStatusNodePath());
+    public Optional<ClusterState> load() {
+        String value = 
repository.getDirectly(ComputeNode.getClusterStatusNodePath());
+        return Strings.isNullOrEmpty(value) ? Optional.empty() : 
Optional.of(ClusterState.valueOf(value));
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
index 5d60abb8f4b..77dfa65e17f 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
@@ -18,12 +18,13 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.watcher;
 
 import com.google.common.base.Strings;
-import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
+import org.apache.shardingsphere.infra.state.cluster.ClusterState;
+import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.mode.event.DataChangedEvent;
 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -47,9 +48,17 @@ public final class ClusterStateChangedWatcher implements 
GovernanceWatcher<Gover
     
     @Override
     public Optional<GovernanceEvent> createGovernanceEvent(final 
DataChangedEvent event) {
-        String clusterStatus = ComputeNode.getClusterStatusNodePath();
-        return Strings.isNullOrEmpty(clusterStatus) || Type.DELETED == 
event.getType() || 
!event.getKey().equals(ComputeNode.getClusterStatusNodePath())
+        String clusterStatusPath = ComputeNode.getClusterStatusNodePath();
+        return Strings.isNullOrEmpty(clusterStatusPath) || Type.DELETED == 
event.getType() || 
!event.getKey().equals(ComputeNode.getClusterStatusNodePath())
                 ? Optional.empty()
-                : Optional.of(new ClusterStateEvent(event.getValue()));
+                : Optional.of(new ClusterStateEvent(getClusterState(event)));
+    }
+    
+    private ClusterState getClusterState(final DataChangedEvent event) {
+        try {
+            return ClusterState.valueOf(event.getValue());
+        } catch (final IllegalArgumentException ignore) {
+            return ClusterState.OK;
+        }
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
index c44a129bfb2..9a87ffb3c92 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
@@ -84,7 +84,7 @@ public final class StateChangedSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public synchronized void renew(final ClusterStateEvent event) {
-        contextManager.updateClusterState(event.getStatus());
+        contextManager.updateClusterState(event.getClusterState());
     }
     
     /**
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusServiceTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusServiceTest.java
index 15b1de83b95..fac728a5e3c 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusServiceTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusServiceTest.java
@@ -38,7 +38,7 @@ class ClusterStatusServiceTest {
     @Test
     void assertPersistClusterStateWithoutPath() {
         ClusterStatusService clusterStatusService = new 
ClusterStatusService(repository);
-        clusterStatusService.persistClusterState(ClusterState.OK);
+        clusterStatusService.persist(ClusterState.OK);
         verify(repository).persist(ComputeNode.getClusterStatusNodePath(), 
ClusterState.OK.name());
     }
     
@@ -46,13 +46,13 @@ class ClusterStatusServiceTest {
     void assertPersistClusterStateWithPath() {
         ClusterStatusService clusterStatusService = new 
ClusterStatusService(repository);
         
when(repository.getDirectly("/nodes/compute_nodes/status")).thenReturn(ClusterState.OK.name());
-        clusterStatusService.persistClusterState(ClusterState.OK);
+        clusterStatusService.persist(ClusterState.OK);
         verify(repository, 
times(0)).persist(ComputeNode.getClusterStatusNodePath(), 
ClusterState.OK.name());
     }
     
     @Test
     void assertLoadClusterStatus() {
-        new ClusterStatusService(repository).loadClusterStatus();
+        new ClusterStatusService(repository).load();
         verify(repository).getDirectly(ComputeNode.getClusterStatusNodePath());
     }
 }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcherTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcherTest.java
index 27c462ac970..9d9eaa59698 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcherTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcherTest.java
@@ -37,7 +37,7 @@ class ClusterStateChangedWatcherTest {
         Optional<GovernanceEvent> actual = new ClusterStateChangedWatcher()
                 .createGovernanceEvent(new 
DataChangedEvent("/nodes/compute_nodes/status", ClusterState.READ_ONLY.name(), 
Type.UPDATED));
         assertTrue(actual.isPresent());
-        assertThat(((ClusterStateEvent) actual.get()).getStatus(), 
is(ClusterState.READ_ONLY.name()));
+        assertThat(((ClusterStateEvent) actual.get()).getClusterState(), 
is(ClusterState.READ_ONLY));
     }
     
     @Test
@@ -45,7 +45,7 @@ class ClusterStateChangedWatcherTest {
         Optional<GovernanceEvent> actual = new ClusterStateChangedWatcher()
                 .createGovernanceEvent(new 
DataChangedEvent("/nodes/compute_nodes/status", 
ClusterState.UNAVAILABLE.name(), Type.UPDATED));
         assertTrue(actual.isPresent());
-        assertThat(((ClusterStateEvent) actual.get()).getStatus(), 
is(ClusterState.UNAVAILABLE.name()));
+        assertThat(((ClusterStateEvent) actual.get()).getClusterState(), 
is(ClusterState.UNAVAILABLE));
     }
     
     @Test
@@ -53,6 +53,6 @@ class ClusterStateChangedWatcherTest {
         Optional<GovernanceEvent> actual = new ClusterStateChangedWatcher()
                 .createGovernanceEvent(new 
DataChangedEvent("/nodes/compute_nodes/status", ClusterState.OK.name(), 
Type.UPDATED));
         assertTrue(actual.isPresent());
-        assertThat(((ClusterStateEvent) actual.get()).getStatus(), 
is(ClusterState.OK.name()));
+        assertThat(((ClusterStateEvent) actual.get()).getClusterState(), 
is(ClusterState.OK));
     }
 }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index f52d94d8078..2f51892339c 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -130,7 +130,7 @@ class StateChangedSubscriberTest {
     
     @Test
     void assertRenewClusterStatus() {
-        ClusterStateEvent mockClusterStateEvent = new 
ClusterStateEvent("READ_ONLY");
+        ClusterStateEvent mockClusterStateEvent = new 
ClusterStateEvent(ClusterState.READ_ONLY);
         subscriber.renew(mockClusterStateEvent);
         assertThat(contextManager.getClusterStateContext().getCurrentState(), 
is(ClusterState.READ_ONLY));
     }

Reply via email to