This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0da2e1ca007 Add ClusterInstanceRegistry (#34093)
0da2e1ca007 is described below

commit 0da2e1ca0073ea0e9c5c1b3b406336bb7816a72a
Author: Liang Zhang <zhangli...@apache.org>
AuthorDate: Wed Dec 18 00:21:52 2024 +0800

    Add ClusterInstanceRegistry (#34093)
    
    * Add ClusterInstanceRegistry
---
 .../infra/instance/ClusterInstanceRegistry.java    | 62 ++++++++++++++++++++++
 .../infra/instance/ComputeNodeInstanceContext.java | 53 ++++++------------
 .../instance/ClusterInstanceRegistryTest.java      | 43 +++++++++++++++
 .../instance/ComputeNodeInstanceContextTest.java   | 44 +++------------
 .../cluster/ClusterContextManagerBuilder.java      |  3 +-
 .../type/ComputeNodeStateSubscriber.java           |  5 +-
 .../type/ComputeNodeStateSubscriberTest.java       |  4 +-
 .../ral/queryable/ShowComputeNodesExecutor.java    |  2 +-
 .../ral/updatable/LabelComputeNodeExecutor.java    |  2 +-
 .../ral/updatable/SetComputeNodeStateExecutor.java |  7 +--
 .../ral/updatable/UnlabelComputeNodeExecutor.java  |  2 +-
 .../queryable/ShowComputeNodesExecutorTest.java    |  2 +-
 .../updatable/SetComputeNodeStateExecutorTest.java |  5 +-
 13 files changed, 144 insertions(+), 90 deletions(-)

diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ClusterInstanceRegistry.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ClusterInstanceRegistry.java
new file mode 100644
index 00000000000..1be97d6883d
--- /dev/null
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ClusterInstanceRegistry.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.instance;
+
+import lombok.Getter;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * Cluster instance registry.
+ */
+@Getter
+public final class ClusterInstanceRegistry {
+    
+    private final Collection<ComputeNodeInstance> allClusterInstances = new 
CopyOnWriteArrayList<>();
+    
+    /**
+     * Find compute node instance.
+     *
+     * @param instanceId instance ID
+     * @return compute node instance
+     */
+    public Optional<ComputeNodeInstance> find(final String instanceId) {
+        return allClusterInstances.stream().filter(each -> 
instanceId.equals(each.getMetaData().getId())).findFirst();
+    }
+    
+    /**
+     * Add compute node instance.
+     *
+     * @param instance compute node instance
+     */
+    public void add(final ComputeNodeInstance instance) {
+        allClusterInstances.removeIf(each -> 
each.getMetaData().getId().equalsIgnoreCase(instance.getMetaData().getId()));
+        allClusterInstances.add(instance);
+    }
+    
+    /**
+     * Delete compute node instance.
+     *
+     * @param instance compute node instance
+     */
+    public void delete(final ComputeNodeInstance instance) {
+        allClusterInstances.removeIf(each -> 
each.getMetaData().getId().equalsIgnoreCase(instance.getMetaData().getId()));
+    }
+}
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContext.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContext.java
index 8f733d3c0e1..c1cbcd2d877 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContext.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContext.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.infra.instance;
 
 import lombok.AccessLevel;
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
@@ -31,13 +30,11 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Compute node instance context.
  */
-@RequiredArgsConstructor
 @Getter
 @ThreadSafe
 public final class ComputeNodeInstanceContext {
@@ -49,12 +46,21 @@ public final class ComputeNodeInstanceContext {
     private final EventBusContext eventBusContext;
     
     @Getter(AccessLevel.NONE)
-    private final AtomicReference<WorkerIdGenerator> workerIdGenerator = new 
AtomicReference<>();
+    private final AtomicReference<WorkerIdGenerator> workerIdGenerator;
     
     @Getter(AccessLevel.NONE)
-    private final AtomicReference<LockContext<?>> lockContext = new 
AtomicReference<>();
+    private final AtomicReference<LockContext<?>> lockContext;
     
-    private final Collection<ComputeNodeInstance> allClusterInstances = new 
CopyOnWriteArrayList<>();
+    private final ClusterInstanceRegistry clusterInstanceRegistry;
+    
+    public ComputeNodeInstanceContext(final ComputeNodeInstance instance, 
final ModeConfiguration modeConfiguration, final EventBusContext 
eventBusContext) {
+        this.instance = instance;
+        this.modeConfiguration = modeConfiguration;
+        this.eventBusContext = eventBusContext;
+        workerIdGenerator = new AtomicReference<>();
+        lockContext = new AtomicReference<>();
+        clusterInstanceRegistry = new ClusterInstanceRegistry();
+    }
     
     /**
      * Initialize compute node instance context.
@@ -81,7 +87,7 @@ public final class ComputeNodeInstanceContext {
         if (instance.getMetaData().getId().equals(instanceId)) {
             instance.switchState(instanceState.get());
         }
-        allClusterInstances.stream().filter(each -> 
each.getMetaData().getId().equals(instanceId)).forEach(each -> 
each.switchState(instanceState.get()));
+        clusterInstanceRegistry.find(instanceId).ifPresent(optional -> 
optional.switchState(instanceState.get()));
     }
     
     /**
@@ -94,7 +100,7 @@ public final class ComputeNodeInstanceContext {
         if (instance.getMetaData().getId().equals(instanceId)) {
             updateLabels(instance, labels);
         }
-        allClusterInstances.stream().filter(each -> 
each.getMetaData().getId().equals(instanceId)).forEach(each -> 
updateLabels(each, labels));
+        clusterInstanceRegistry.find(instanceId).ifPresent(optional -> 
updateLabels(optional, labels));
     }
     
     private void updateLabels(final ComputeNodeInstance computeNodeInstance, 
final Collection<String> labels) {
@@ -112,7 +118,7 @@ public final class ComputeNodeInstanceContext {
         if (instance.getMetaData().getId().equals(instanceId)) {
             instance.setWorkerId(workerId);
         }
-        allClusterInstances.stream().filter(each -> 
each.getMetaData().getId().equals(instanceId)).forEach(each -> 
each.setWorkerId(workerId));
+        clusterInstanceRegistry.find(instanceId).ifPresent(optional -> 
optional.setWorkerId(workerId));
     }
     
     /**
@@ -137,35 +143,6 @@ public final class ComputeNodeInstanceContext {
         return result;
     }
     
-    /**
-     * Add compute node instance.
-     *
-     * @param instance compute node instance
-     */
-    public void addComputeNodeInstance(final ComputeNodeInstance instance) {
-        allClusterInstances.removeIf(each -> 
each.getMetaData().getId().equalsIgnoreCase(instance.getMetaData().getId()));
-        allClusterInstances.add(instance);
-    }
-    
-    /**
-     * Delete compute node instance.
-     *
-     * @param instance compute node instance
-     */
-    public void deleteComputeNodeInstance(final ComputeNodeInstance instance) {
-        allClusterInstances.removeIf(each -> 
each.getMetaData().getId().equalsIgnoreCase(instance.getMetaData().getId()));
-    }
-    
-    /**
-     * Get compute node instance.
-     *
-     * @param instanceId instance ID
-     * @return compute node instance
-     */
-    public Optional<ComputeNodeInstance> getComputeNodeInstanceById(final 
String instanceId) {
-        return allClusterInstances.stream().filter(each -> 
instanceId.equals(each.getMetaData().getId())).findFirst();
-    }
-    
     /**
      * Get lock context.
      *
diff --git 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/ClusterInstanceRegistryTest.java
 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/ClusterInstanceRegistryTest.java
new file mode 100644
index 00000000000..c150225c4a7
--- /dev/null
+++ 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/ClusterInstanceRegistryTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.instance;
+
+import 
org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ClusterInstanceRegistryTest {
+    
+    @Test
+    void assertAdd() {
+        ClusterInstanceRegistry registry = new ClusterInstanceRegistry();
+        registry.add(new ComputeNodeInstance(new 
ProxyInstanceMetaData("foo_instance_id", 3307)));
+        assertTrue(registry.find("foo_instance_id").isPresent());
+    }
+    
+    @Test
+    void assertDelete() {
+        ClusterInstanceRegistry registry = new ClusterInstanceRegistry();
+        registry.add(new ComputeNodeInstance(new 
ProxyInstanceMetaData("foo_instance_id", 3307)));
+        assertTrue(registry.find("foo_instance_id").isPresent());
+        registry.delete(new ComputeNodeInstance(new 
ProxyInstanceMetaData("foo_instance_id", 3307)));
+        assertFalse(registry.find("foo_instance_id").isPresent());
+    }
+}
diff --git 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContextTest.java
 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContextTest.java
index bd57e2b09e8..9d757c3e220 100644
--- 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContextTest.java
+++ 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContextTest.java
@@ -27,12 +27,10 @@ import 
org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
-import java.util.Optional;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -65,7 +63,7 @@ class ComputeNodeInstanceContextTest {
         InstanceMetaData instanceMetaData = new 
ProxyInstanceMetaData("foo_instance_id", 3306);
         ComputeNodeInstanceContext instanceContext = new 
ComputeNodeInstanceContext(new ComputeNodeInstance(instanceMetaData), 
mock(ModeConfiguration.class), new EventBusContext());
         instanceContext.init(mock(WorkerIdGenerator.class), 
mock(LockContext.class));
-        instanceContext.addComputeNodeInstance(new ComputeNodeInstance(new 
ProxyInstanceMetaData("bar_instance_id", 3307)));
+        instanceContext.getClusterInstanceRegistry().add(new 
ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307)));
         instanceContext.updateStatus("foo_instance_id", 
InstanceState.CIRCUIT_BREAK.name());
         assertThat(instanceContext.getInstance().getState().getCurrentState(), 
is(InstanceState.CIRCUIT_BREAK));
     }
@@ -75,7 +73,7 @@ class ComputeNodeInstanceContextTest {
         InstanceMetaData instanceMetaData = new 
ProxyInstanceMetaData("foo_instance_id", 3306);
         ComputeNodeInstanceContext instanceContext = new 
ComputeNodeInstanceContext(new ComputeNodeInstance(instanceMetaData), 
mock(ModeConfiguration.class), new EventBusContext());
         instanceContext.init(mock(WorkerIdGenerator.class), 
mock(LockContext.class));
-        instanceContext.addComputeNodeInstance(new ComputeNodeInstance(new 
ProxyInstanceMetaData("bar_instance_id", 3307)));
+        instanceContext.getClusterInstanceRegistry().add(new 
ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307)));
         instanceContext.updateStatus("bar_instance_id", 
InstanceState.CIRCUIT_BREAK.name());
         assertThat(instanceContext.getInstance().getState().getCurrentState(), 
is(InstanceState.OK));
     }
@@ -94,10 +92,10 @@ class ComputeNodeInstanceContextTest {
         InstanceMetaData instanceMetaData = new 
ProxyInstanceMetaData("foo_instance_id", 3306);
         ComputeNodeInstanceContext instanceContext = new 
ComputeNodeInstanceContext(new ComputeNodeInstance(instanceMetaData), 
mock(ModeConfiguration.class), new EventBusContext());
         instanceContext.init(mock(WorkerIdGenerator.class), 
mock(LockContext.class));
-        instanceContext.addComputeNodeInstance(new ComputeNodeInstance(new 
ProxyInstanceMetaData("bar_instance_id", 3307)));
+        instanceContext.getClusterInstanceRegistry().add(new 
ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307)));
         instanceContext.updateLabels("bar_instance_id", 
Arrays.asList("label_1", "label_2"));
         assertTrue(instanceContext.getInstance().getLabels().isEmpty());
-        
assertThat(instanceContext.getAllClusterInstances().iterator().next().getLabels(),
 is(Arrays.asList("label_1", "label_2")));
+        
assertThat(instanceContext.getClusterInstanceRegistry().getAllClusterInstances().iterator().next().getLabels(),
 is(Arrays.asList("label_1", "label_2")));
     }
     
     @Test
@@ -114,10 +112,10 @@ class ComputeNodeInstanceContextTest {
         ComputeNodeInstance instance = new ComputeNodeInstance(new 
ProxyInstanceMetaData("foo_instance_id", 3306));
         ComputeNodeInstanceContext instanceContext = new 
ComputeNodeInstanceContext(instance, mock(ModeConfiguration.class), new 
EventBusContext());
         instanceContext.init(mock(WorkerIdGenerator.class), 
mock(LockContext.class));
-        instanceContext.addComputeNodeInstance(new ComputeNodeInstance(new 
ProxyInstanceMetaData("bar_instance_id", 3307)));
+        instanceContext.getClusterInstanceRegistry().add(new 
ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307)));
         instanceContext.updateWorkerId("bar_instance_id", 10);
         assertThat(instanceContext.getWorkerId(), is(-1));
-        
assertThat(instanceContext.getAllClusterInstances().iterator().next().getWorkerId(),
 is(10));
+        
assertThat(instanceContext.getClusterInstanceRegistry().getAllClusterInstances().iterator().next().getWorkerId(),
 is(10));
     }
     
     @Test
@@ -126,34 +124,4 @@ class ComputeNodeInstanceContextTest {
         instanceContext.init(mock(WorkerIdGenerator.class), 
mock(LockContext.class));
         assertThat(instanceContext.generateWorkerId(new Properties()), is(0));
     }
-    
-    @Test
-    void assertAddComputeNodeInstance() {
-        ComputeNodeInstance instance = new ComputeNodeInstance(new 
ProxyInstanceMetaData("foo_instance_id", 3306));
-        ComputeNodeInstanceContext instanceContext = new 
ComputeNodeInstanceContext(instance, mock(ModeConfiguration.class), new 
EventBusContext());
-        instanceContext.init(mock(WorkerIdGenerator.class), 
mock(LockContext.class));
-        instanceContext.addComputeNodeInstance(new ComputeNodeInstance(new 
ProxyInstanceMetaData("bar_instance_id", 3307)));
-        assertFalse(instanceContext.getAllClusterInstances().isEmpty());
-    }
-    
-    @Test
-    void assertDeleteComputeNodeInstance() {
-        ComputeNodeInstance instance = new ComputeNodeInstance(new 
ProxyInstanceMetaData("foo_instance_id", 3306));
-        ComputeNodeInstanceContext instanceContext = new 
ComputeNodeInstanceContext(instance, mock(ModeConfiguration.class), new 
EventBusContext());
-        instanceContext.init(mock(WorkerIdGenerator.class), 
mock(LockContext.class));
-        instanceContext.addComputeNodeInstance(new ComputeNodeInstance(new 
ProxyInstanceMetaData("bar_instance_id", 3307)));
-        instanceContext.deleteComputeNodeInstance(new ComputeNodeInstance(new 
ProxyInstanceMetaData("bar_instance_id", 3307)));
-        assertTrue(instanceContext.getAllClusterInstances().isEmpty());
-    }
-    
-    @Test
-    void assertGetComputeNodeInstanceById() {
-        ComputeNodeInstance instance = new ComputeNodeInstance(new 
ProxyInstanceMetaData("foo_instance_id", 3306));
-        ComputeNodeInstanceContext instanceContext = new 
ComputeNodeInstanceContext(instance, mock(ModeConfiguration.class), new 
EventBusContext());
-        instanceContext.init(mock(WorkerIdGenerator.class), 
mock(LockContext.class));
-        instanceContext.addComputeNodeInstance(new ComputeNodeInstance(new 
ProxyInstanceMetaData("bar_instance_id", 3307)));
-        Optional<ComputeNodeInstance> actual = 
instanceContext.getComputeNodeInstanceById("bar_instance_id");
-        assertTrue(actual.isPresent());
-        assertThat(actual.get().getMetaData().getId(), is("bar_instance_id"));
-    }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 647a0692093..c34ae5186ae 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -78,7 +78,8 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
     private void registerOnline(final ComputeNodeInstanceContext 
computeNodeInstanceContext, final ContextManagerBuilderParameter param, final 
ContextManager contextManager,
                                 final ClusterPersistRepository repository) {
         
contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance());
-        
contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances());
+        
contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().getAllClusterInstances()
+                
.addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances());
         new DataChangedEventListenerRegistry(contextManager, 
getDatabaseNames(param, 
contextManager.getPersistServiceFacade().getMetaDataPersistService())).register();
         ClusterEventSubscriberRegistry eventSubscriberRegistry = new 
ClusterEventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext());
         
eventSubscriberRegistry.register(createDeliverEventSubscribers(repository));
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java
index cf282f34c58..c01a1210caa 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java
@@ -49,7 +49,8 @@ public final class ComputeNodeStateSubscriber implements 
DispatchEventSubscriber
      */
     @Subscribe
     public synchronized void renew(final InstanceOnlineEvent event) {
-        
computeNodeInstanceContext.addComputeNodeInstance(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData()));
+        computeNodeInstanceContext.getClusterInstanceRegistry()
+                
.add(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData()));
     }
     
     /**
@@ -59,7 +60,7 @@ public final class ComputeNodeStateSubscriber implements 
DispatchEventSubscriber
      */
     @Subscribe
     public synchronized void renew(final InstanceOfflineEvent event) {
-        computeNodeInstanceContext.deleteComputeNodeInstance(new 
ComputeNodeInstance(event.getInstanceMetaData()));
+        computeNodeInstanceContext.getClusterInstanceRegistry().delete(new 
ComputeNodeInstance(event.getInstanceMetaData()));
     }
     
     /**
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriberTest.java
index 528f71cc98b..e2c76070bd9 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriberTest.java
@@ -58,13 +58,13 @@ class ComputeNodeStateSubscriberTest {
         ComputeNodeInstance computeNodeInstance = 
mock(ComputeNodeInstance.class);
         
when(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(instanceMetaData)).thenReturn(computeNodeInstance);
         subscriber.renew(new InstanceOnlineEvent(instanceMetaData));
-        
verify(contextManager.getComputeNodeInstanceContext()).addComputeNodeInstance(computeNodeInstance);
+        
verify(contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry()).add(computeNodeInstance);
     }
     
     @Test
     void assertRenewWithInstanceOfflineEvent() {
         subscriber.renew(new 
InstanceOfflineEvent(mock(InstanceMetaData.class)));
-        
verify(contextManager.getComputeNodeInstanceContext()).deleteComputeNodeInstance(any());
+        
verify(contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry()).delete(any());
     }
     
     @Test
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutor.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutor.java
index c78adb2bb52..9493c1e4af3 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutor.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutor.java
@@ -45,7 +45,7 @@ public final class ShowComputeNodesExecutor implements 
DistSQLQueryExecutor<Show
         String modeType = 
contextManager.getComputeNodeInstanceContext().getModeConfiguration().getType();
         return "Standalone".equals(modeType)
                 ? 
Collections.singleton(buildRow(contextManager.getComputeNodeInstanceContext().getInstance(),
 modeType))
-                : 
contextManager.getComputeNodeInstanceContext().getAllClusterInstances().stream().map(each
 -> buildRow(each, modeType)).collect(Collectors.toList());
+                : 
contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().getAllClusterInstances().stream().map(each
 -> buildRow(each, modeType)).collect(Collectors.toList());
     }
     
     private LocalDataQueryResultRow buildRow(final ComputeNodeInstance 
instance, final String modeType) {
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LabelComputeNodeExecutor.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LabelComputeNodeExecutor.java
index 5528182ca2f..3c5a69e011c 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LabelComputeNodeExecutor.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LabelComputeNodeExecutor.java
@@ -38,7 +38,7 @@ public final class LabelComputeNodeExecutor implements 
DistSQLUpdateExecutor<Lab
     @Override
     public void executeUpdate(final LabelComputeNodeStatement sqlStatement, 
final ContextManager contextManager) throws SQLException {
         String instanceId = sqlStatement.getInstanceId();
-        Optional<ComputeNodeInstance> computeNodeInstance = 
contextManager.getComputeNodeInstanceContext().getComputeNodeInstanceById(instanceId);
+        Optional<ComputeNodeInstance> computeNodeInstance = 
contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().find(instanceId);
         if (computeNodeInstance.isPresent()) {
             Collection<String> labels = new 
LinkedHashSet<>(sqlStatement.getLabels());
             if (!sqlStatement.isOverwrite()) {
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutor.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutor.java
index eddb2e30b84..7dd1e314a7a 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutor.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutor.java
@@ -43,16 +43,17 @@ public final class SetComputeNodeStateExecutor implements 
DistSQLUpdateExecutor<
     }
     
     private void checkEnablingIsValid(final ContextManager contextManager, 
final String instanceId) {
-        
ShardingSpherePreconditions.checkState(contextManager.getComputeNodeInstanceContext().getComputeNodeInstanceById(instanceId).isPresent(),
+        
ShardingSpherePreconditions.checkState(contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().find(instanceId).isPresent(),
                 () -> new UnsupportedSQLOperationException(String.format("`%s` 
does not exist", instanceId)));
     }
     
     private void checkDisablingIsValid(final ContextManager contextManager, 
final String instanceId) {
         
ShardingSpherePreconditions.checkState(!contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId().equals(instanceId),
                 () -> new UnsupportedSQLOperationException(String.format("`%s` 
is the currently in use instance and cannot be disabled", instanceId)));
-        
ShardingSpherePreconditions.checkState(contextManager.getComputeNodeInstanceContext().getComputeNodeInstanceById(instanceId).isPresent(),
+        
ShardingSpherePreconditions.checkState(contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().find(instanceId).isPresent(),
                 () -> new UnsupportedSQLOperationException(String.format("`%s` 
does not exist", instanceId)));
-        ShardingSpherePreconditions.checkState(InstanceState.CIRCUIT_BREAK != 
contextManager.getComputeNodeInstanceContext().getComputeNodeInstanceById(instanceId).get().getState().getCurrentState(),
+        ShardingSpherePreconditions.checkState(
+                InstanceState.CIRCUIT_BREAK != 
contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().find(instanceId).get().getState().getCurrentState(),
                 () -> new UnsupportedSQLOperationException(String.format("`%s` 
compute node has been disabled", instanceId)));
     }
     
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlabelComputeNodeExecutor.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlabelComputeNodeExecutor.java
index 8ad01485fa4..abc998d1956 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlabelComputeNodeExecutor.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlabelComputeNodeExecutor.java
@@ -38,7 +38,7 @@ public final class UnlabelComputeNodeExecutor implements 
DistSQLUpdateExecutor<U
     @Override
     public void executeUpdate(final UnlabelComputeNodeStatement sqlStatement, 
final ContextManager contextManager) {
         String instanceId = sqlStatement.getInstanceId();
-        Optional<ComputeNodeInstance> computeNodeInstance = 
contextManager.getComputeNodeInstanceContext().getComputeNodeInstanceById(instanceId);
+        Optional<ComputeNodeInstance> computeNodeInstance = 
contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().find(instanceId);
         if (computeNodeInstance.isPresent()) {
             Collection<String> labels = new 
LinkedHashSet<>(computeNodeInstance.get().getLabels());
             if (sqlStatement.getLabels().isEmpty()) {
diff --git 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutorTest.java
 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutorTest.java
index 89d8fb6ca05..75e83399b22 100644
--- 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutorTest.java
+++ 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutorTest.java
@@ -99,7 +99,7 @@ class ShowComputeNodesExecutorTest {
         when(computeNodeInstance.getMetaData()).thenReturn(new 
ProxyInstanceMetaData("foo", "127.0.0.1@3309", "foo_version"));
         when(computeNodeInstance.getState()).thenReturn(new 
InstanceStateContext());
         when(computeNodeInstance.getWorkerId()).thenReturn(1);
-        
when(result.getAllClusterInstances()).thenReturn(Collections.singleton(computeNodeInstance));
+        
when(result.getClusterInstanceRegistry().getAllClusterInstances()).thenReturn(Collections.singleton(computeNodeInstance));
         return result;
     }
 }
diff --git 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutorTest.java
 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutorTest.java
index 3de9510c90e..af62131be69 100644
--- 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutorTest.java
+++ 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutorTest.java
@@ -48,8 +48,9 @@ class SetComputeNodeStateExecutorTest {
     void assertExecuteUpdateWithAlreadyDisableInstance() {
         ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
         
when(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId()).thenReturn("currentInstance");
-        
when(contextManager.getComputeNodeInstanceContext().getComputeNodeInstanceById("instanceID").isPresent()).thenReturn(true);
-        
when(contextManager.getComputeNodeInstanceContext().getComputeNodeInstanceById("instanceID").get().getState().getCurrentState()).thenReturn(InstanceState.CIRCUIT_BREAK);
+        
when(contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().find("instanceID").isPresent()).thenReturn(true);
+        
when(contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().find("instanceID").get().getState().getCurrentState())
+                .thenReturn(InstanceState.CIRCUIT_BREAK);
         assertThrows(UnsupportedSQLOperationException.class, () -> 
executor.executeUpdate(new SetComputeNodeStateStatement("DISABLE", 
"instanceID"), contextManager));
     }
 }

Reply via email to