This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c27654f8650 Rename SessionConnectionReconnectListener (#31287)
c27654f8650 is described below

commit c27654f8650a0627bb41db3ba08cf969b4abd2cb
Author: Liang Zhang <[email protected]>
AuthorDate: Sun May 19 13:20:25 2024 +0800

    Rename SessionConnectionReconnectListener (#31287)
    
    * Rename SessionConnectionReconnectListener
    
    * Rename SessionConnectionReconnectListener
    
    * Rename SessionConnectionReconnectListener
---
 .../cluster/ClusterContextManagerBuilder.java      | 12 ++++---
 .../compute/service/ComputeNodeStatusService.java  |  3 +-
 .../cluster/repository/provider/zookeeper/pom.xml  |  5 +++
 .../cluster/zookeeper/ZookeeperRepository.java     |  4 +--
 ...ava => SessionConnectionReconnectListener.java} | 41 +++++++++++-----------
 5 files changed, 36 insertions(+), 29 deletions(-)

diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index ccb9b96a100..8485b119ea6 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -17,10 +17,12 @@
 
 package org.apache.shardingsphere.mode.manager.cluster;
 
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.infra.instance.InstanceContextAware;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import 
org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.state.cluster.ClusterState;
@@ -63,8 +65,9 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
     
     @Override
     public ContextManager build(final ContextManagerBuilderParameter param, 
final EventBusContext eventBusContext) throws SQLException {
-        ClusterPersistRepository repository = 
getClusterPersistRepository((ClusterPersistRepositoryConfiguration) 
param.getModeConfiguration().getRepository());
-        InstanceContext instanceContext = buildInstanceContext(repository, 
param, eventBusContext);
+        ModeConfiguration modeConfig = param.getModeConfiguration();
+        ClusterPersistRepository repository = 
getClusterPersistRepository((ClusterPersistRepositoryConfiguration) 
modeConfig.getRepository());
+        InstanceContext instanceContext = buildInstanceContext(modeConfig, 
param.getInstanceMetaData(), repository, eventBusContext);
         if (repository instanceof InstanceContextAware) {
             ((InstanceContextAware) 
repository).setInstanceContext(instanceContext);
         }
@@ -85,8 +88,9 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
         return result;
     }
     
-    private InstanceContext buildInstanceContext(final 
ClusterPersistRepository repository, final ContextManagerBuilderParameter 
param, final EventBusContext eventBusContext) {
-        return new InstanceContext(new 
ComputeNodeInstance(param.getInstanceMetaData()), new 
ClusterWorkerIdGenerator(repository, param.getInstanceMetaData()), 
param.getModeConfiguration(),
+    private InstanceContext buildInstanceContext(final ModeConfiguration 
modeConfig,
+                                                 final InstanceMetaData 
instanceMetaData, final ClusterPersistRepository repository, final 
EventBusContext eventBusContext) {
+        return new InstanceContext(new ComputeNodeInstance(instanceMetaData), 
new ClusterWorkerIdGenerator(repository, instanceMetaData), modeConfig,
                 new ClusterModeContextManager(), new GlobalLockContext(new 
GlobalLockPersistService(initDistributedLockHolder(repository))), 
eventBusContext);
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 4896bbbd27f..a32f544d43d 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -20,7 +20,6 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
 import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
 import org.apache.shardingsphere.infra.instance.ComputeNodeData;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
@@ -52,7 +51,7 @@ public final class ComputeNodeStatusService {
      */
     public void registerOnline(final InstanceMetaData instanceMetaData) {
         
repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceMetaData.getId(),
 instanceMetaData.getType()),
-                YamlEngine.marshal(new 
ComputeNodeData(instanceMetaData.getAttributes(), 
ShardingSphereVersion.VERSION)));
+                YamlEngine.marshal(new 
ComputeNodeData(instanceMetaData.getAttributes(), 
instanceMetaData.getVersion())));
     }
     
     /**
diff --git a/mode/type/cluster/repository/provider/zookeeper/pom.xml 
b/mode/type/cluster/repository/provider/zookeeper/pom.xml
index f32dbef7503..73125b6ddce 100644
--- a/mode/type/cluster/repository/provider/zookeeper/pom.xml
+++ b/mode/type/cluster/repository/provider/zookeeper/pom.xml
@@ -32,6 +32,11 @@
             <artifactId>shardingsphere-cluster-mode-repository-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-cluster-mode-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         
         <dependency>
             <groupId>org.apache.shardingsphere</groupId>
diff --git 
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
 
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
index 44845d3f89a..d13167139c3 100644
--- 
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
+++ 
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
@@ -38,7 +38,7 @@ import 
org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 import 
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.ZookeeperExceptionHandler;
-import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionListener;
+import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionReconnectListener;
 import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperProperties;
 import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperPropertyKey;
 import org.apache.zookeeper.CreateMode;
@@ -285,7 +285,7 @@ public final class ZookeeperRepository implements 
ClusterPersistRepository, Inst
     
     @Override
     public void setInstanceContext(final InstanceContext instanceContext) {
-        client.getConnectionStateListenable().addListener(new 
SessionConnectionListener(instanceContext, this));
+        client.getConnectionStateListenable().addListener(new 
SessionConnectionReconnectListener(instanceContext, this));
     }
     
     @Override
diff --git 
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
 
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionReconnectListener.java
similarity index 61%
rename from 
mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
rename to 
mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionReconnectListener.java
index a03783f2be8..092d2058323 100644
--- 
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionListener.java
+++ 
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionReconnectListener.java
@@ -17,46 +17,47 @@
 
 package org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener;
 
-import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.shardingsphere.infra.instance.ComputeNodeData;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
 import java.util.Properties;
 
 /**
- * Session connection state listener.
+ * Session connection reconnect listener.
  */
-@RequiredArgsConstructor
 @Slf4j
-public final class SessionConnectionListener implements 
ConnectionStateListener {
+public final class SessionConnectionReconnectListener implements 
ConnectionStateListener {
     
     private static final long RECONNECT_INTERVAL_SECONDS = 5L;
     
     private final InstanceContext instanceContext;
     
-    private final ClusterPersistRepository repository;
+    private final ComputeNodeStatusService computeNodeStatusService;
+    
+    public SessionConnectionReconnectListener(final InstanceContext 
instanceContext, final ClusterPersistRepository repository) {
+        this.instanceContext = instanceContext;
+        computeNodeStatusService = new ComputeNodeStatusService(repository);
+    }
     
     @Override
     public void stateChanged(final CuratorFramework client, final 
ConnectionState connectionState) {
-        if (ConnectionState.LOST == connectionState) {
-            boolean reRegistered;
-            do {
-                reRegistered = reRegister(client);
-            } while (!reRegistered);
-            log.debug("Instance re-register success instance id: {}", 
instanceContext.getInstance().getCurrentInstanceId());
+        if (ConnectionState.LOST != connectionState) {
+            return;
         }
+        boolean isReconnectFailed;
+        do {
+            isReconnectFailed = !reconnect(client);
+        } while (isReconnectFailed);
+        log.info("Instance reconnect success, instance ID: {}", 
instanceContext.getInstance().getCurrentInstanceId());
     }
     
-    private boolean reRegister(final CuratorFramework client) {
+    private boolean reconnect(final CuratorFramework client) {
         try {
             if (client.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
                 if (isNeedGenerateWorkerId()) {
@@ -78,11 +79,9 @@ public final class SessionConnectionListener implements 
ConnectionStateListener
     }
     
     private void reRegisterInstanceComputeNode() {
-        ComputeNodeInstance instance = instanceContext.getInstance();
-        
repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instance.getCurrentInstanceId(),
-                instance.getMetaData().getType()), YamlEngine.marshal(new 
ComputeNodeData(instance.getMetaData().getAttributes(), 
instance.getMetaData().getVersion())));
-        
repository.persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instance.getCurrentInstanceId()),
 YamlEngine.marshal(instance.getLabels()));
-        
repository.persistEphemeral(ComputeNode.getInstanceStatusNodePath(instance.getCurrentInstanceId()),
 instance.getState().getCurrentState().name());
+        
computeNodeStatusService.registerOnline(instanceContext.getInstance().getMetaData());
+        
computeNodeStatusService.persistInstanceLabels(instanceContext.getInstance().getMetaData().getId(),
 instanceContext.getInstance().getLabels());
+        
computeNodeStatusService.persistInstanceState(instanceContext.getInstance().getMetaData().getId(),
 instanceContext.getInstance().getState());
     }
     
     @SneakyThrows(InterruptedException.class)

Reply via email to