This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ccfc10cb4da Remove worker-id from compute node instance (#18951)
ccfc10cb4da is described below
commit ccfc10cb4dadd5b9effad53391a7fc0477ad731f
Author: gin <[email protected]>
AuthorDate: Fri Jul 8 15:10:22 2022 +0800
Remove worker-id from compute node instance (#18951)
* Remove worker-id from compute node instance
* Fix log info
---
.../infra/instance/ComputeNodeInstance.java | 2 --
.../infra/instance/InstanceContext.java | 28 +-----------------
.../metadata/persist/node/ComputeNodeTest.java | 5 ++++
.../ClusterContextManagerCoordinator.java | 13 ---------
.../status/compute/event/WorkerIdEvent.java | 34 ----------------------
.../compute/service/ComputeNodeStatusService.java | 1 -
.../watcher/ComputeNodeStateChangedWatcher.java | 4 ---
.../generator/ClusterWorkerIdGenerator.java | 6 ++--
.../service/ComputeNodeStatusServiceTest.java | 6 ++++
.../ComputeNodeStateChangedWatcherTest.java | 19 ------------
10 files changed, 15 insertions(+), 103 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 51a68a2e8e7..adda363dc40 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -40,8 +40,6 @@ public final class ComputeNodeInstance {
private Collection<String> labels;
- private Long workerId;
-
/**
* Set labels.
*
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 352bb7c8f2d..1ff9164163d 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
@@ -77,17 +76,6 @@ public final class InstanceContext {
}
}
- /**
- * Update instance worker id.
- *
- * @param workerId worker id
- */
- public void updateWorkerId(final Long workerId) {
- if (!Objects.equals(workerId, instance.getWorkerId())) {
- instance.setWorkerId(workerId);
- }
- }
-
/**
* Update instance label.
*
@@ -101,15 +89,6 @@ public final class InstanceContext {
computeNodeInstances.stream().filter(each ->
each.getInstanceMetaData().getId().equals(instanceId)).forEach(each ->
each.setLabels(labels));
}
- /**
- * Get worker id.
- *
- * @return worker id
- */
- public long getWorkerId() {
- return instance.getWorkerId();
- }
-
/**
* Generate worker id.
*
@@ -117,12 +96,7 @@ public final class InstanceContext {
* @return worker id
*/
public long generateWorkerId(final Properties props) {
- Long result = instance.getWorkerId();
- if (null == result) {
- result = workerIdGenerator.generate(props);
- instance.setWorkerId(result);
- }
- return result;
+ return workerIdGenerator.generate(props);
}
/**
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
index 8d46c5fed38..8c0a24caa48 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
@@ -60,6 +60,11 @@ public final class ComputeNodeTest {
assertThat(ComputeNode.getInstanceWorkerIdNodePath("foo_instance"),
is("/nodes/compute_nodes/worker_id/foo_instance"));
}
+ @Test
+ public void assertGetInstanceWorkerIdRootNodePath() {
+ assertThat(ComputeNode.getInstanceWorkerIdRootNodePath(),
is("/nodes/compute_nodes/worker_id"));
+ }
+
@Test
public void assertGetInstanceIdByComputeNodePath() {
assertThat(ComputeNode.getInstanceIdByComputeNode("/nodes/compute_nodes/status/foo_instance_1"),
is("foo_instance_1"));
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 690ef0b76cd..ad8f3a7e187 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -48,7 +48,6 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
@@ -219,18 +218,6 @@ public final class ClusterContextManagerCoordinator {
contextManager.getInstanceContext().updateInstanceStatus(event.getInstanceId(),
event.getStatus());
}
- /**
- * Renew instance worker id.
- *
- * @param event worker id event
- */
- @Subscribe
- public synchronized void renew(final WorkerIdEvent event) {
- if
(contextManager.getInstanceContext().getInstance().getInstanceMetaData().getId().equals(event.getInstanceId()))
{
-
contextManager.getInstanceContext().updateWorkerId(event.getWorkerId());
- }
- }
-
/**
* Renew instance labels.
*
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
deleted file mode 100644
index a979386202b..00000000000
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
-
-/**
- * Worker id changed event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class WorkerIdEvent implements GovernanceEvent {
-
- private final String instanceId;
-
- private final Long workerId;
-}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index c197e90a2d7..611bd05308a 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -146,7 +146,6 @@ public final class ComputeNodeStatusService {
ComputeNodeInstance result = new ComputeNodeInstance(instanceMetaData);
result.setLabels(loadInstanceLabels(instanceMetaData.getId()));
result.switchState(loadInstanceStatus(instanceMetaData.getId()));
-
loadInstanceWorkerId(instanceMetaData.getId()).ifPresent(result::setWorkerId);
return result;
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan
[...]
index 35a5c1cc143..c1bfd093142 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -30,7 +30,6 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -67,9 +66,6 @@ public final class ComputeNodeStateChangedWatcher implements
GovernanceWatcher<G
Collection<String> status =
Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() :
YamlEngine.unmarshal(event.getValue(), Collection.class);
return Optional.of(new StateEvent(instanceId, status));
}
- if
(event.getKey().equals(ComputeNode.getInstanceWorkerIdNodePath(instanceId))) {
- return Optional.of(new WorkerIdEvent(instanceId,
Strings.isNullOrEmpty(event.getValue()) ? null :
Long.valueOf(event.getValue())));
- }
if
(event.getKey().equals(ComputeNode.getInstanceLabelsNodePath(instanceId))) {
return Optional.of(new LabelsEvent(instanceId,
Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() :
YamlEngine.unmarshal(event.getValue(), Collection.class)));
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
index a69a369a82a..d388c278ce7 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
@@ -58,10 +58,10 @@ public final class ClusterWorkerIdGenerator implements
WorkerIdGenerator {
reTryCount++;
result = generateSequentialId();
if (result > MAX_WORKER_ID) {
- result = result % 1024L;
+ result = result % MAX_WORKER_ID + 1;
}
if (reTryCount > MAX_RE_TRY) {
- throw new ShardingSphereException("System assigned work-id
failed, assigned work-id was {}", result);
+ throw new ShardingSphereException("System assigned %s failed,
assigned worker id was %s", WORKER_ID_KEY, result);
}
} while (isExist(result));
registryCenter.getComputeNodeStatusService().persistInstanceWorkerId(instanceMetaData.getId(),
result);
@@ -80,7 +80,7 @@ public final class ClusterWorkerIdGenerator implements
WorkerIdGenerator {
private void checkConfigured(final long generatedWorkerId, final
Properties props) {
Optional<Long> configuredWorkerId = parseWorkerId(props);
if (configuredWorkerId.isPresent()) {
- log.warn("No need to configured {} in cluster mode, system
assigned work-id was {}", WORKER_ID_KEY, generatedWorkerId);
+ log.warn("No need to configured {} in cluster mode, system
assigned worker id was {}", WORKER_ID_KEY, generatedWorkerId);
}
}
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServi
[...]
index 3fe67e38a04..d08b616363b 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -112,4 +112,10 @@ public final class ComputeNodeStatusServiceTest {
ComputeNodeInstance actual = new
ComputeNodeStatusService(repository).loadComputeNodeInstance(instanceMetaData);
assertThat(actual.getInstanceMetaData(), is(instanceMetaData));
}
+
+ @Test
+ public void assertGetUsedWorkerIds() {
+ new ComputeNodeStatusService(repository).getUsedWorkerIds();
+
verify(repository).getChildrenKeys(ComputeNode.getInstanceWorkerIdRootNodePath());
+ }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeState
[...]
index b63eefc2544..0724b2d2c39 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -22,7 +22,6 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import org.junit.Test;
@@ -55,24 +54,6 @@ public final class ComputeNodeStateChangedWatcherTest {
assertThat(((StateEvent) actual.get()).getInstanceId(),
is("127.0.0.1@3307"));
}
- @Test
- public void assertCreateAddWorkerIdEvent() {
- Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
- .createGovernanceEvent(new
DataChangedEvent("/nodes/compute_nodes/worker_id/127.0.0.1@3307", "123",
Type.ADDED));
- assertTrue(actual.isPresent());
- assertThat(((WorkerIdEvent) actual.get()).getWorkerId(), is(123L));
- assertThat(((WorkerIdEvent) actual.get()).getInstanceId(),
is("127.0.0.1@3307"));
- }
-
- @Test
- public void assertCreateUpdateWorkerIdEvent() {
- Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
- .createGovernanceEvent(new
DataChangedEvent("/nodes/compute_nodes/worker_id/127.0.0.1@3307", "123",
Type.UPDATED));
- assertTrue(actual.isPresent());
- assertThat(((WorkerIdEvent) actual.get()).getWorkerId(), is(123L));
- assertThat(((WorkerIdEvent) actual.get()).getInstanceId(),
is("127.0.0.1@3307"));
- }
-
@Test
public void assertCreateAddLabelEvent() {
Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()