This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 0da2e1ca007 Add ClusterInstanceRegistry (#34093) 0da2e1ca007 is described below commit 0da2e1ca0073ea0e9c5c1b3b406336bb7816a72a Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Wed Dec 18 00:21:52 2024 +0800 Add ClusterInstanceRegistry (#34093) * Add ClusterInstanceRegistry --- .../infra/instance/ClusterInstanceRegistry.java | 62 ++++++++++++++++++++++ .../infra/instance/ComputeNodeInstanceContext.java | 53 ++++++------------ .../instance/ClusterInstanceRegistryTest.java | 43 +++++++++++++++ .../instance/ComputeNodeInstanceContextTest.java | 44 +++------------ .../cluster/ClusterContextManagerBuilder.java | 3 +- .../type/ComputeNodeStateSubscriber.java | 5 +- .../type/ComputeNodeStateSubscriberTest.java | 4 +- .../ral/queryable/ShowComputeNodesExecutor.java | 2 +- .../ral/updatable/LabelComputeNodeExecutor.java | 2 +- .../ral/updatable/SetComputeNodeStateExecutor.java | 7 +-- .../ral/updatable/UnlabelComputeNodeExecutor.java | 2 +- .../queryable/ShowComputeNodesExecutorTest.java | 2 +- .../updatable/SetComputeNodeStateExecutorTest.java | 5 +- 13 files changed, 144 insertions(+), 90 deletions(-) diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ClusterInstanceRegistry.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ClusterInstanceRegistry.java new file mode 100644 index 00000000000..1be97d6883d --- /dev/null +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ClusterInstanceRegistry.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.instance; + +import lombok.Getter; + +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * Cluster instance registry. + */ +@Getter +public final class ClusterInstanceRegistry { + + private final Collection<ComputeNodeInstance> allClusterInstances = new CopyOnWriteArrayList<>(); + + /** + * Find compute node instance. + * + * @param instanceId instance ID + * @return compute node instance + */ + public Optional<ComputeNodeInstance> find(final String instanceId) { + return allClusterInstances.stream().filter(each -> instanceId.equals(each.getMetaData().getId())).findFirst(); + } + + /** + * Add compute node instance. + * + * @param instance compute node instance + */ + public void add(final ComputeNodeInstance instance) { + allClusterInstances.removeIf(each -> each.getMetaData().getId().equalsIgnoreCase(instance.getMetaData().getId())); + allClusterInstances.add(instance); + } + + /** + * Delete compute node instance. + * + * @param instance compute node instance + */ + public void delete(final ComputeNodeInstance instance) { + allClusterInstances.removeIf(each -> each.getMetaData().getId().equalsIgnoreCase(instance.getMetaData().getId())); + } +} diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContext.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContext.java index 8f733d3c0e1..c1cbcd2d877 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContext.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContext.java @@ -19,7 +19,6 @@ package org.apache.shardingsphere.infra.instance; import lombok.AccessLevel; import lombok.Getter; -import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.config.mode.ModeConfiguration; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator; @@ -31,13 +30,11 @@ import javax.annotation.concurrent.ThreadSafe; import java.util.Collection; import java.util.Optional; import java.util.Properties; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReference; /** * Compute node instance context. */ -@RequiredArgsConstructor @Getter @ThreadSafe public final class ComputeNodeInstanceContext { @@ -49,12 +46,21 @@ public final class ComputeNodeInstanceContext { private final EventBusContext eventBusContext; @Getter(AccessLevel.NONE) - private final AtomicReference<WorkerIdGenerator> workerIdGenerator = new AtomicReference<>(); + private final AtomicReference<WorkerIdGenerator> workerIdGenerator; @Getter(AccessLevel.NONE) - private final AtomicReference<LockContext<?>> lockContext = new AtomicReference<>(); + private final AtomicReference<LockContext<?>> lockContext; - private final Collection<ComputeNodeInstance> allClusterInstances = new CopyOnWriteArrayList<>(); + private final ClusterInstanceRegistry clusterInstanceRegistry; + + public ComputeNodeInstanceContext(final ComputeNodeInstance instance, final ModeConfiguration modeConfiguration, final EventBusContext eventBusContext) { + this.instance = instance; + this.modeConfiguration = modeConfiguration; + this.eventBusContext = eventBusContext; + workerIdGenerator = new AtomicReference<>(); + lockContext = new AtomicReference<>(); + clusterInstanceRegistry = new ClusterInstanceRegistry(); + } /** * Initialize compute node instance context. @@ -81,7 +87,7 @@ public final class ComputeNodeInstanceContext { if (instance.getMetaData().getId().equals(instanceId)) { instance.switchState(instanceState.get()); } - allClusterInstances.stream().filter(each -> each.getMetaData().getId().equals(instanceId)).forEach(each -> each.switchState(instanceState.get())); + clusterInstanceRegistry.find(instanceId).ifPresent(optional -> optional.switchState(instanceState.get())); } /** @@ -94,7 +100,7 @@ public final class ComputeNodeInstanceContext { if (instance.getMetaData().getId().equals(instanceId)) { updateLabels(instance, labels); } - allClusterInstances.stream().filter(each -> each.getMetaData().getId().equals(instanceId)).forEach(each -> updateLabels(each, labels)); + clusterInstanceRegistry.find(instanceId).ifPresent(optional -> updateLabels(optional, labels)); } private void updateLabels(final ComputeNodeInstance computeNodeInstance, final Collection<String> labels) { @@ -112,7 +118,7 @@ public final class ComputeNodeInstanceContext { if (instance.getMetaData().getId().equals(instanceId)) { instance.setWorkerId(workerId); } - allClusterInstances.stream().filter(each -> each.getMetaData().getId().equals(instanceId)).forEach(each -> each.setWorkerId(workerId)); + clusterInstanceRegistry.find(instanceId).ifPresent(optional -> optional.setWorkerId(workerId)); } /** @@ -137,35 +143,6 @@ public final class ComputeNodeInstanceContext { return result; } - /** - * Add compute node instance. - * - * @param instance compute node instance - */ - public void addComputeNodeInstance(final ComputeNodeInstance instance) { - allClusterInstances.removeIf(each -> each.getMetaData().getId().equalsIgnoreCase(instance.getMetaData().getId())); - allClusterInstances.add(instance); - } - - /** - * Delete compute node instance. - * - * @param instance compute node instance - */ - public void deleteComputeNodeInstance(final ComputeNodeInstance instance) { - allClusterInstances.removeIf(each -> each.getMetaData().getId().equalsIgnoreCase(instance.getMetaData().getId())); - } - - /** - * Get compute node instance. - * - * @param instanceId instance ID - * @return compute node instance - */ - public Optional<ComputeNodeInstance> getComputeNodeInstanceById(final String instanceId) { - return allClusterInstances.stream().filter(each -> instanceId.equals(each.getMetaData().getId())).findFirst(); - } - /** * Get lock context. * diff --git a/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/ClusterInstanceRegistryTest.java b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/ClusterInstanceRegistryTest.java new file mode 100644 index 00000000000..c150225c4a7 --- /dev/null +++ b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/ClusterInstanceRegistryTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.instance; + +import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ClusterInstanceRegistryTest { + + @Test + void assertAdd() { + ClusterInstanceRegistry registry = new ClusterInstanceRegistry(); + registry.add(new ComputeNodeInstance(new ProxyInstanceMetaData("foo_instance_id", 3307))); + assertTrue(registry.find("foo_instance_id").isPresent()); + } + + @Test + void assertDelete() { + ClusterInstanceRegistry registry = new ClusterInstanceRegistry(); + registry.add(new ComputeNodeInstance(new ProxyInstanceMetaData("foo_instance_id", 3307))); + assertTrue(registry.find("foo_instance_id").isPresent()); + registry.delete(new ComputeNodeInstance(new ProxyInstanceMetaData("foo_instance_id", 3307))); + assertFalse(registry.find("foo_instance_id").isPresent()); + } +} diff --git a/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContextTest.java b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContextTest.java index bd57e2b09e8..9d757c3e220 100644 --- a/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContextTest.java +++ b/infra/common/src/test/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContextTest.java @@ -27,12 +27,10 @@ import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; import org.junit.jupiter.api.Test; import java.util.Arrays; -import java.util.Optional; import java.util.Properties; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -65,7 +63,7 @@ class ComputeNodeInstanceContextTest { InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3306); ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(instanceMetaData), mock(ModeConfiguration.class), new EventBusContext()); instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class)); - instanceContext.addComputeNodeInstance(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307))); + instanceContext.getClusterInstanceRegistry().add(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307))); instanceContext.updateStatus("foo_instance_id", InstanceState.CIRCUIT_BREAK.name()); assertThat(instanceContext.getInstance().getState().getCurrentState(), is(InstanceState.CIRCUIT_BREAK)); } @@ -75,7 +73,7 @@ class ComputeNodeInstanceContextTest { InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3306); ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(instanceMetaData), mock(ModeConfiguration.class), new EventBusContext()); instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class)); - instanceContext.addComputeNodeInstance(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307))); + instanceContext.getClusterInstanceRegistry().add(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307))); instanceContext.updateStatus("bar_instance_id", InstanceState.CIRCUIT_BREAK.name()); assertThat(instanceContext.getInstance().getState().getCurrentState(), is(InstanceState.OK)); } @@ -94,10 +92,10 @@ class ComputeNodeInstanceContextTest { InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3306); ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(instanceMetaData), mock(ModeConfiguration.class), new EventBusContext()); instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class)); - instanceContext.addComputeNodeInstance(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307))); + instanceContext.getClusterInstanceRegistry().add(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307))); instanceContext.updateLabels("bar_instance_id", Arrays.asList("label_1", "label_2")); assertTrue(instanceContext.getInstance().getLabels().isEmpty()); - assertThat(instanceContext.getAllClusterInstances().iterator().next().getLabels(), is(Arrays.asList("label_1", "label_2"))); + assertThat(instanceContext.getClusterInstanceRegistry().getAllClusterInstances().iterator().next().getLabels(), is(Arrays.asList("label_1", "label_2"))); } @Test @@ -114,10 +112,10 @@ class ComputeNodeInstanceContextTest { ComputeNodeInstance instance = new ComputeNodeInstance(new ProxyInstanceMetaData("foo_instance_id", 3306)); ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(instance, mock(ModeConfiguration.class), new EventBusContext()); instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class)); - instanceContext.addComputeNodeInstance(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307))); + instanceContext.getClusterInstanceRegistry().add(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307))); instanceContext.updateWorkerId("bar_instance_id", 10); assertThat(instanceContext.getWorkerId(), is(-1)); - assertThat(instanceContext.getAllClusterInstances().iterator().next().getWorkerId(), is(10)); + assertThat(instanceContext.getClusterInstanceRegistry().getAllClusterInstances().iterator().next().getWorkerId(), is(10)); } @Test @@ -126,34 +124,4 @@ class ComputeNodeInstanceContextTest { instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class)); assertThat(instanceContext.generateWorkerId(new Properties()), is(0)); } - - @Test - void assertAddComputeNodeInstance() { - ComputeNodeInstance instance = new ComputeNodeInstance(new ProxyInstanceMetaData("foo_instance_id", 3306)); - ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(instance, mock(ModeConfiguration.class), new EventBusContext()); - instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class)); - instanceContext.addComputeNodeInstance(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307))); - assertFalse(instanceContext.getAllClusterInstances().isEmpty()); - } - - @Test - void assertDeleteComputeNodeInstance() { - ComputeNodeInstance instance = new ComputeNodeInstance(new ProxyInstanceMetaData("foo_instance_id", 3306)); - ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(instance, mock(ModeConfiguration.class), new EventBusContext()); - instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class)); - instanceContext.addComputeNodeInstance(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307))); - instanceContext.deleteComputeNodeInstance(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307))); - assertTrue(instanceContext.getAllClusterInstances().isEmpty()); - } - - @Test - void assertGetComputeNodeInstanceById() { - ComputeNodeInstance instance = new ComputeNodeInstance(new ProxyInstanceMetaData("foo_instance_id", 3306)); - ComputeNodeInstanceContext instanceContext = new ComputeNodeInstanceContext(instance, mock(ModeConfiguration.class), new EventBusContext()); - instanceContext.init(mock(WorkerIdGenerator.class), mock(LockContext.class)); - instanceContext.addComputeNodeInstance(new ComputeNodeInstance(new ProxyInstanceMetaData("bar_instance_id", 3307))); - Optional<ComputeNodeInstance> actual = instanceContext.getComputeNodeInstanceById("bar_instance_id"); - assertTrue(actual.isPresent()); - assertThat(actual.get().getMetaData().getId(), is("bar_instance_id")); - } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java index 647a0692093..c34ae5186ae 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java @@ -78,7 +78,8 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder private void registerOnline(final ComputeNodeInstanceContext computeNodeInstanceContext, final ContextManagerBuilderParameter param, final ContextManager contextManager, final ClusterPersistRepository repository) { contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance()); - contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances()); + contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().getAllClusterInstances() + .addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances()); new DataChangedEventListenerRegistry(contextManager, getDatabaseNames(param, contextManager.getPersistServiceFacade().getMetaDataPersistService())).register(); ClusterEventSubscriberRegistry eventSubscriberRegistry = new ClusterEventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext()); eventSubscriberRegistry.register(createDeliverEventSubscribers(repository)); diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java index cf282f34c58..c01a1210caa 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java @@ -49,7 +49,8 @@ public final class ComputeNodeStateSubscriber implements DispatchEventSubscriber */ @Subscribe public synchronized void renew(final InstanceOnlineEvent event) { - computeNodeInstanceContext.addComputeNodeInstance(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData())); + computeNodeInstanceContext.getClusterInstanceRegistry() + .add(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData())); } /** @@ -59,7 +60,7 @@ public final class ComputeNodeStateSubscriber implements DispatchEventSubscriber */ @Subscribe public synchronized void renew(final InstanceOfflineEvent event) { - computeNodeInstanceContext.deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceMetaData())); + computeNodeInstanceContext.getClusterInstanceRegistry().delete(new ComputeNodeInstance(event.getInstanceMetaData())); } /** diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriberTest.java index 528f71cc98b..e2c76070bd9 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriberTest.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriberTest.java @@ -58,13 +58,13 @@ class ComputeNodeStateSubscriberTest { ComputeNodeInstance computeNodeInstance = mock(ComputeNodeInstance.class); when(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(instanceMetaData)).thenReturn(computeNodeInstance); subscriber.renew(new InstanceOnlineEvent(instanceMetaData)); - verify(contextManager.getComputeNodeInstanceContext()).addComputeNodeInstance(computeNodeInstance); + verify(contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry()).add(computeNodeInstance); } @Test void assertRenewWithInstanceOfflineEvent() { subscriber.renew(new InstanceOfflineEvent(mock(InstanceMetaData.class))); - verify(contextManager.getComputeNodeInstanceContext()).deleteComputeNodeInstance(any()); + verify(contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry()).delete(any()); } @Test diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutor.java index c78adb2bb52..9493c1e4af3 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutor.java @@ -45,7 +45,7 @@ public final class ShowComputeNodesExecutor implements DistSQLQueryExecutor<Show String modeType = contextManager.getComputeNodeInstanceContext().getModeConfiguration().getType(); return "Standalone".equals(modeType) ? Collections.singleton(buildRow(contextManager.getComputeNodeInstanceContext().getInstance(), modeType)) - : contextManager.getComputeNodeInstanceContext().getAllClusterInstances().stream().map(each -> buildRow(each, modeType)).collect(Collectors.toList()); + : contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().getAllClusterInstances().stream().map(each -> buildRow(each, modeType)).collect(Collectors.toList()); } private LocalDataQueryResultRow buildRow(final ComputeNodeInstance instance, final String modeType) { diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LabelComputeNodeExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LabelComputeNodeExecutor.java index 5528182ca2f..3c5a69e011c 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LabelComputeNodeExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LabelComputeNodeExecutor.java @@ -38,7 +38,7 @@ public final class LabelComputeNodeExecutor implements DistSQLUpdateExecutor<Lab @Override public void executeUpdate(final LabelComputeNodeStatement sqlStatement, final ContextManager contextManager) throws SQLException { String instanceId = sqlStatement.getInstanceId(); - Optional<ComputeNodeInstance> computeNodeInstance = contextManager.getComputeNodeInstanceContext().getComputeNodeInstanceById(instanceId); + Optional<ComputeNodeInstance> computeNodeInstance = contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().find(instanceId); if (computeNodeInstance.isPresent()) { Collection<String> labels = new LinkedHashSet<>(sqlStatement.getLabels()); if (!sqlStatement.isOverwrite()) { diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutor.java index eddb2e30b84..7dd1e314a7a 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutor.java @@ -43,16 +43,17 @@ public final class SetComputeNodeStateExecutor implements DistSQLUpdateExecutor< } private void checkEnablingIsValid(final ContextManager contextManager, final String instanceId) { - ShardingSpherePreconditions.checkState(contextManager.getComputeNodeInstanceContext().getComputeNodeInstanceById(instanceId).isPresent(), + ShardingSpherePreconditions.checkState(contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().find(instanceId).isPresent(), () -> new UnsupportedSQLOperationException(String.format("`%s` does not exist", instanceId))); } private void checkDisablingIsValid(final ContextManager contextManager, final String instanceId) { ShardingSpherePreconditions.checkState(!contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId().equals(instanceId), () -> new UnsupportedSQLOperationException(String.format("`%s` is the currently in use instance and cannot be disabled", instanceId))); - ShardingSpherePreconditions.checkState(contextManager.getComputeNodeInstanceContext().getComputeNodeInstanceById(instanceId).isPresent(), + ShardingSpherePreconditions.checkState(contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().find(instanceId).isPresent(), () -> new UnsupportedSQLOperationException(String.format("`%s` does not exist", instanceId))); - ShardingSpherePreconditions.checkState(InstanceState.CIRCUIT_BREAK != contextManager.getComputeNodeInstanceContext().getComputeNodeInstanceById(instanceId).get().getState().getCurrentState(), + ShardingSpherePreconditions.checkState( + InstanceState.CIRCUIT_BREAK != contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().find(instanceId).get().getState().getCurrentState(), () -> new UnsupportedSQLOperationException(String.format("`%s` compute node has been disabled", instanceId))); } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlabelComputeNodeExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlabelComputeNodeExecutor.java index 8ad01485fa4..abc998d1956 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlabelComputeNodeExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlabelComputeNodeExecutor.java @@ -38,7 +38,7 @@ public final class UnlabelComputeNodeExecutor implements DistSQLUpdateExecutor<U @Override public void executeUpdate(final UnlabelComputeNodeStatement sqlStatement, final ContextManager contextManager) { String instanceId = sqlStatement.getInstanceId(); - Optional<ComputeNodeInstance> computeNodeInstance = contextManager.getComputeNodeInstanceContext().getComputeNodeInstanceById(instanceId); + Optional<ComputeNodeInstance> computeNodeInstance = contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().find(instanceId); if (computeNodeInstance.isPresent()) { Collection<String> labels = new LinkedHashSet<>(computeNodeInstance.get().getLabels()); if (sqlStatement.getLabels().isEmpty()) { diff --git a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutorTest.java b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutorTest.java index 89d8fb6ca05..75e83399b22 100644 --- a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutorTest.java +++ b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutorTest.java @@ -99,7 +99,7 @@ class ShowComputeNodesExecutorTest { when(computeNodeInstance.getMetaData()).thenReturn(new ProxyInstanceMetaData("foo", "127.0.0.1@3309", "foo_version")); when(computeNodeInstance.getState()).thenReturn(new InstanceStateContext()); when(computeNodeInstance.getWorkerId()).thenReturn(1); - when(result.getAllClusterInstances()).thenReturn(Collections.singleton(computeNodeInstance)); + when(result.getClusterInstanceRegistry().getAllClusterInstances()).thenReturn(Collections.singleton(computeNodeInstance)); return result; } } diff --git a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutorTest.java b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutorTest.java index 3de9510c90e..af62131be69 100644 --- a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutorTest.java +++ b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetComputeNodeStateExecutorTest.java @@ -48,8 +48,9 @@ class SetComputeNodeStateExecutorTest { void assertExecuteUpdateWithAlreadyDisableInstance() { ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS); when(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId()).thenReturn("currentInstance"); - when(contextManager.getComputeNodeInstanceContext().getComputeNodeInstanceById("instanceID").isPresent()).thenReturn(true); - when(contextManager.getComputeNodeInstanceContext().getComputeNodeInstanceById("instanceID").get().getState().getCurrentState()).thenReturn(InstanceState.CIRCUIT_BREAK); + when(contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().find("instanceID").isPresent()).thenReturn(true); + when(contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().find("instanceID").get().getState().getCurrentState()) + .thenReturn(InstanceState.CIRCUIT_BREAK); assertThrows(UnsupportedSQLOperationException.class, () -> executor.executeUpdate(new SetComputeNodeStateStatement("DISABLE", "instanceID"), contextManager)); } }