This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c27654f8650 Rename SessionConnectionReconnectListener (#31287)
c27654f8650 is described below
commit c27654f8650a0627bb41db3ba08cf969b4abd2cb
Author: Liang Zhang <[email protected]>
AuthorDate: Sun May 19 13:20:25 2024 +0800
Rename SessionConnectionReconnectListener (#31287)
* Rename SessionConnectionReconnectListener
* Rename SessionConnectionReconnectListener
* Rename SessionConnectionReconnectListener
---
.../cluster/ClusterContextManagerBuilder.java | 12 ++++---
.../compute/service/ComputeNodeStatusService.java | 3 +-
.../cluster/repository/provider/zookeeper/pom.xml | 5 +++
.../cluster/zookeeper/ZookeeperRepository.java | 4 +--
...ava => SessionConnectionReconnectListener.java} | 41 +++++++++++-----------
5 files changed, 36 insertions(+), 29 deletions(-)
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index ccb9b96a100..8485b119ea6 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -17,10 +17,12 @@
package org.apache.shardingsphere.mode.manager.cluster;
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.InstanceContextAware;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import
org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
@@ -63,8 +65,9 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
@Override
public ContextManager build(final ContextManagerBuilderParameter param,
final EventBusContext eventBusContext) throws SQLException {
- ClusterPersistRepository repository =
getClusterPersistRepository((ClusterPersistRepositoryConfiguration)
param.getModeConfiguration().getRepository());
- InstanceContext instanceContext = buildInstanceContext(repository,
param, eventBusContext);
+ ModeConfiguration modeConfig = param.getModeConfiguration();
+ ClusterPersistRepository repository =
getClusterPersistRepository((ClusterPersistRepositoryConfiguration)
modeConfig.getRepository());
+ InstanceContext instanceContext = buildInstanceContext(modeConfig,
param.getInstanceMetaData(), repository, eventBusContext);
if (repository instanceof InstanceContextAware) {
((InstanceContextAware)
repository).setInstanceContext(instanceContext);
}
@@ -85,8 +88,9 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
return result;
}
- private InstanceContext buildInstanceContext(final
ClusterPersistRepository repository, final ContextManagerBuilderParameter
param, final EventBusContext eventBusContext) {
- return new InstanceContext(new
ComputeNodeInstance(param.getInstanceMetaData()), new
ClusterWorkerIdGenerator(repository, param.getInstanceMetaData()),
param.getModeConfiguration(),
+ private InstanceContext buildInstanceContext(final ModeConfiguration
modeConfig,
+ final InstanceMetaData
instanceMetaData, final ClusterPersistRepository repository, final
EventBusContext eventBusContext) {
+ return new InstanceContext(new ComputeNodeInstance(instanceMetaData),
new ClusterWorkerIdGenerator(repository, instanceMetaData), modeConfig,
new ClusterModeContextManager(), new GlobalLockContext(new
GlobalLockPersistService(initDistributedLockHolder(repository))),
eventBusContext);
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 4896bbbd27f..a32f544d43d 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -20,7 +20,6 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
import org.apache.shardingsphere.infra.instance.ComputeNodeData;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
@@ -52,7 +51,7 @@ public final class ComputeNodeStatusService {
*/
public void registerOnline(final InstanceMetaData instanceMetaData) {
repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceMetaData.getId(),
instanceMetaData.getType()),
- YamlEngine.marshal(new
ComputeNodeData(instanceMetaData.getAttributes(),
ShardingSphereVersion.VERSION)));
+ YamlEngine.marshal(new
ComputeNodeData(instanceMetaData.getAttributes(),
instanceMetaData.getVersion())));
}
/**
diff --git a/mode/type/cluster/repository/provider/zookeeper/pom.xml
b/mode/type/cluster/repository/provider/zookeeper/pom.xml
index f32dbef7503..73125b6ddce 100644
--- a/mode/type/cluster/repository/provider/zookeeper/pom.xml
+++ b/mode/type/cluster/repository/provider/zookeeper/pom.xml
@@ -32,6 +32,11 @@
<artifactId>shardingsphere-cluster-mode-repository-api</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-cluster-mode-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
diff --git
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
index 44845d3f89a..d13167139c3 100644
---
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
+++
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
@@ -38,7 +38,7 @@ import
org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import
org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.ZookeeperExceptionHandler;
-import
org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionListener;
+import
org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionReconnectListener;
import
org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperProperties;
import
org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperPropertyKey;
import org.apache.zookeeper.CreateMode;
@@ -285,7 +285,7 @@ public final class ZookeeperRepository implements
ClusterPersistRepository, Inst
@Override
public void setInstanceContext(final InstanceContext instanceContext) {
- client.getConnectionStateListenable().addListener(new
SessionConnectionListener(instanceContext, this));
+ client.getConnectionStateListenable().addListener(new
SessionConnectionReconnectListener(instanceContext, this));
}
@Override
diff --git
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionReconnectListener.java
similarity index 61%
rename from
mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
rename to
mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionReconnectListener.java
index a03783f2be8..092d2058323 100644
---
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
+++
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionReconnectListener.java
@@ -17,46 +17,47 @@
package org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener;
-import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.shardingsphere.infra.instance.ComputeNodeData;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.InstanceContext;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import java.util.Properties;
/**
- * Session connection state listener.
+ * Session connection reconnect listener.
*/
-@RequiredArgsConstructor
@Slf4j
-public final class SessionConnectionListener implements
ConnectionStateListener {
+public final class SessionConnectionReconnectListener implements
ConnectionStateListener {
private static final long RECONNECT_INTERVAL_SECONDS = 5L;
private final InstanceContext instanceContext;
- private final ClusterPersistRepository repository;
+ private final ComputeNodeStatusService computeNodeStatusService;
+
+ public SessionConnectionReconnectListener(final InstanceContext
instanceContext, final ClusterPersistRepository repository) {
+ this.instanceContext = instanceContext;
+ computeNodeStatusService = new ComputeNodeStatusService(repository);
+ }
@Override
public void stateChanged(final CuratorFramework client, final
ConnectionState connectionState) {
- if (ConnectionState.LOST == connectionState) {
- boolean reRegistered;
- do {
- reRegistered = reRegister(client);
- } while (!reRegistered);
- log.debug("Instance re-register success instance id: {}",
instanceContext.getInstance().getCurrentInstanceId());
+ if (ConnectionState.LOST != connectionState) {
+ return;
}
+ boolean isReconnectFailed;
+ do {
+ isReconnectFailed = !reconnect(client);
+ } while (isReconnectFailed);
+ log.info("Instance reconnect success, instance ID: {}",
instanceContext.getInstance().getCurrentInstanceId());
}
- private boolean reRegister(final CuratorFramework client) {
+ private boolean reconnect(final CuratorFramework client) {
try {
if (client.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
if (isNeedGenerateWorkerId()) {
@@ -78,11 +79,9 @@ public final class SessionConnectionListener implements
ConnectionStateListener
}
private void reRegisterInstanceComputeNode() {
- ComputeNodeInstance instance = instanceContext.getInstance();
-
repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instance.getCurrentInstanceId(),
- instance.getMetaData().getType()), YamlEngine.marshal(new
ComputeNodeData(instance.getMetaData().getAttributes(),
instance.getMetaData().getVersion())));
-
repository.persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instance.getCurrentInstanceId()),
YamlEngine.marshal(instance.getLabels()));
-
repository.persistEphemeral(ComputeNode.getInstanceStatusNodePath(instance.getCurrentInstanceId()),
instance.getState().getCurrentState().name());
+
computeNodeStatusService.registerOnline(instanceContext.getInstance().getMetaData());
+
computeNodeStatusService.persistInstanceLabels(instanceContext.getInstance().getMetaData().getId(),
instanceContext.getInstance().getLabels());
+
computeNodeStatusService.persistInstanceState(instanceContext.getInstance().getMetaData().getId(),
instanceContext.getInstance().getState());
}
@SneakyThrows(InterruptedException.class)