This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 45145a16bdb Restart heartbeat job when primary data source has 
changed. (#18858)
45145a16bdb is described below

commit 45145a16bdbd9aa439f2c06992f90d91d89f8ad8
Author: zhaojinchao <[email protected]>
AuthorDate: Tue Jul 5 18:19:16 2022 +0800

    Restart heartbeat job when primary data source has changed. (#18858)
    
    * Restart heartbeat job when primary datasource has changed
    
    * restart heart beat job
    
    * Fix ci
    
    * Fix ci
    
    * Fix bug
    
    * Remove unless import
---
 .../dbdiscovery/rule/DatabaseDiscoveryRule.java    | 77 +++++++++++-----------
 .../rule/ReadwriteSplittingRule.java               | 10 ++-
 .../identifier/type/RestartHeartBeatJobRule.java   | 35 ++++++++++
 .../ClusterContextManagerCoordinator.java          |  7 +-
 .../ClusterContextManagerCoordinatorTest.java      |  9 +--
 5 files changed, 87 insertions(+), 51 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
 
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index 53544205984..52b29a6350f 100644
--- 
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++ 
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.dbdiscovery.rule;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import lombok.Getter;
 import org.apache.shardingsphere.dbdiscovery.algorithm.DatabaseDiscoveryEngine;
@@ -32,9 +33,11 @@ import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmC
 import 
org.apache.shardingsphere.infra.datasource.strategy.DynamicDataSourceStrategyFactory;
 import org.apache.shardingsphere.infra.distsql.constant.ExportableConstants;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
 import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
 import org.apache.shardingsphere.infra.rule.identifier.scope.DatabaseRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.exportable.ExportableRule;
 import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
@@ -53,12 +56,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.stream.Collectors;
 
 /**
  * Database discovery rule.
  */
-public final class DatabaseDiscoveryRule implements DatabaseRule, 
DataSourceContainedRule, StatusContainedRule, ExportableRule {
+public final class DatabaseDiscoveryRule implements DatabaseRule, 
DataSourceContainedRule, RestartHeartBeatJobRule, StatusContainedRule, 
ExportableRule {
     
     @Getter
     private final RuleConfiguration configuration;
@@ -79,8 +81,7 @@ public final class DatabaseDiscoveryRule implements 
DatabaseRule, DataSourceCont
         discoveryTypes = 
getDiscoveryProviderAlgorithms(ruleConfig.getDiscoveryTypes());
         dataSourceRules = getDataSourceRules(ruleConfig.getDataSources(), 
ruleConfig.getDiscoveryHeartbeats());
         findPrimaryReplicaRelationship(databaseName, dataSourceMap);
-        initAware();
-        
initHeartBeatJobs(instanceContext.getInstance().getCurrentInstanceId());
+        initAwareAndHeartBeatJobs(instanceContext);
     }
     
     public DatabaseDiscoveryRule(final String databaseName,
@@ -91,8 +92,7 @@ public final class DatabaseDiscoveryRule implements 
DatabaseRule, DataSourceCont
         discoveryTypes = ruleConfig.getDiscoveryTypes();
         dataSourceRules = getDataSourceRules(ruleConfig.getDataSources(), 
ruleConfig.getDiscoveryHeartbeats());
         findPrimaryReplicaRelationship(databaseName, dataSourceMap);
-        initAware();
-        
initHeartBeatJobs(instanceContext.getInstance().getCurrentInstanceId());
+        initAwareAndHeartBeatJobs(instanceContext);
     }
     
     private static Map<String, DatabaseDiscoveryProviderAlgorithm> 
getDiscoveryProviderAlgorithms(final Map<String, 
ShardingSphereAlgorithmConfiguration> discoveryTypesConfig) {
@@ -125,25 +125,6 @@ public final class DatabaseDiscoveryRule implements 
DatabaseRule, DataSourceCont
         }
     }
     
-    private void initAware() {
-        DynamicDataSourceStrategyFactory.findInstance().ifPresent(optional -> 
optional.init(this));
-    }
-    
-    private void initHeartBeatJobs(final String instanceId) {
-        Optional<ModeScheduleContext> modeScheduleContext = 
ModeScheduleContextFactory.getInstance().get(instanceId);
-        if (modeScheduleContext.isPresent()) {
-            for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : 
dataSourceRules.entrySet()) {
-                DatabaseDiscoveryDataSourceRule rule = entry.getValue();
-                Map<String, DataSource> dataSources = 
dataSourceMap.entrySet().stream().filter(each -> 
!rule.getDisabledDataSourceNames().contains(each.getKey()))
-                        .collect(Collectors.toMap(Entry::getKey, 
Entry::getValue));
-                String jobName = 
rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + 
"-" + rule.getGroupName();
-                CronJob job = new CronJob(jobName, each -> new 
HeartbeatJob(databaseName, rule.getGroupName(), 
rule.getPrimaryDataSourceName(), dataSources,
-                        rule.getDatabaseDiscoveryProviderAlgorithm(), 
rule.getDisabledDataSourceNames()).execute(null), 
rule.getHeartbeatProps().getProperty("keep-alive-cron"));
-                modeScheduleContext.get().startCronJob(job);
-            }
-        }
-    }
-    
     /**
      * Get single data source rule.
      *
@@ -173,21 +154,41 @@ public final class DatabaseDiscoveryRule implements 
DatabaseRule, DataSourceCont
     }
     
     @Override
-    public void updateStatus(final DataSourceStatusChangedEvent event) {
-        if (event instanceof StorageNodeDataSourceChangedEvent) {
+    public void restart(final DataSourceStatusChangedEvent event, final 
InstanceContext instanceContext) {
+        PrimaryDataSourceChangedEvent dataSourceEvent = 
(PrimaryDataSourceChangedEvent) event;
+        QualifiedDatabase qualifiedDatabase = 
dataSourceEvent.getQualifiedDatabase();
+        DatabaseDiscoveryDataSourceRule dataSourceRule = 
dataSourceRules.get(qualifiedDatabase.getGroupName());
+        Preconditions.checkState(null != dataSourceRule, "Can 't find database 
discovery data source rule in database `%s`.", databaseName);
+        
dataSourceRule.changePrimaryDataSourceName(qualifiedDatabase.getDataSourceName());
+        initAwareAndHeartBeatJobs(instanceContext);
+    }
+    
+    private void initAwareAndHeartBeatJobs(final InstanceContext 
instanceContext) {
+        DynamicDataSourceStrategyFactory.findInstance().ifPresent(optional -> 
optional.init(this));
+        
initHeartBeatJobs(instanceContext.getInstance().getCurrentInstanceId());
+    }
+    
+    private void initHeartBeatJobs(final String instanceId) {
+        Optional<ModeScheduleContext> modeScheduleContext = 
ModeScheduleContextFactory.getInstance().get(instanceId);
+        if (modeScheduleContext.isPresent()) {
             for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : 
dataSourceRules.entrySet()) {
-                StorageNodeDataSourceChangedEvent dataSourceChangedEvent = 
(StorageNodeDataSourceChangedEvent) event;
-                if 
(StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()))
 {
-                    
entry.getValue().disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
-                } else {
-                    
entry.getValue().enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
-                }
+                DatabaseDiscoveryDataSourceRule rule = entry.getValue();
+                String jobName = 
rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + 
"-" + rule.getGroupName();
+                CronJob job = new CronJob(jobName, each -> new 
HeartbeatJob(databaseName, rule.getGroupName(), 
rule.getPrimaryDataSourceName(), dataSourceMap,
+                        rule.getDatabaseDiscoveryProviderAlgorithm(), 
rule.getDisabledDataSourceNames()).execute(null), 
rule.getHeartbeatProps().getProperty("keep-alive-cron"));
+                modeScheduleContext.get().startCronJob(job);
             }
-        } else if (event instanceof PrimaryDataSourceChangedEvent) {
-            for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : 
dataSourceRules.entrySet()) {
-                if 
(entry.getValue().getGroupName().equals(((PrimaryDataSourceChangedEvent) 
event).getQualifiedDatabase().getGroupName())) {
-                    
entry.getValue().changePrimaryDataSourceName(((PrimaryDataSourceChangedEvent) 
event).getQualifiedDatabase().getDataSourceName());
-                }
+        }
+    }
+    
+    @Override
+    public void updateStatus(final DataSourceStatusChangedEvent event) {
+        StorageNodeDataSourceChangedEvent dataSourceChangedEvent = 
(StorageNodeDataSourceChangedEvent) event;
+        for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : 
dataSourceRules.entrySet()) {
+            if 
(StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()))
 {
+                
entry.getValue().disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
+            } else {
+                
entry.getValue().enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
             }
         }
     }
diff --git 
a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
 
b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
index 80c7cb585c0..d6bc225026d 100644
--- 
a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
+++ 
b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
@@ -116,12 +116,10 @@ public final class ReadwriteSplittingRule implements 
DatabaseRule, DataSourceCon
     
     @Override
     public void updateStatus(final DataSourceStatusChangedEvent event) {
-        if (event instanceof StorageNodeDataSourceChangedEvent) {
-            for (Entry<String, ReadwriteSplittingDataSourceRule> entry : 
dataSourceRules.entrySet()) {
-                StorageNodeDataSourceChangedEvent dataSourceChangedEvent = 
(StorageNodeDataSourceChangedEvent) event;
-                
entry.getValue().updateDisabledDataSourceNames(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName(),
-                        
StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()));
-            }
+        for (Entry<String, ReadwriteSplittingDataSourceRule> entry : 
dataSourceRules.entrySet()) {
+            StorageNodeDataSourceChangedEvent dataSourceChangedEvent = 
(StorageNodeDataSourceChangedEvent) event;
+            
entry.getValue().updateDisabledDataSourceNames(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName(),
+                    
StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()));
         }
     }
     
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java
new file mode 100644
index 00000000000..eb7eba3bc2d
--- /dev/null
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.rule.identifier.type;
+
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
+
+/**
+ * Restart heart beat job rule.
+ */
+public interface RestartHeartBeatJobRule extends ShardingSphereRule {
+    
+    /**
+     * Restart heart beat job.
+     * @param event data source status changed event
+     * @param instanceContext instance context
+     */
+    void restart(DataSourceStatusChangedEvent event, InstanceContext 
instanceContext);
+}
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 79698ae9c0b..690ef0b76cd 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecu
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
@@ -192,9 +193,9 @@ public final class ClusterContextManagerCoordinator {
         QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
         
contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
                 .stream()
-                .filter(each -> each instanceof StatusContainedRule)
-                .forEach(each -> ((StatusContainedRule) each)
-                        .updateStatus(new 
PrimaryDataSourceChangedEvent(qualifiedDatabase)));
+                .filter(each -> each instanceof RestartHeartBeatJobRule)
+                .forEach(each -> ((RestartHeartBeatJobRule) each)
+                        .restart(new 
PrimaryDataSourceChangedEvent(qualifiedDatabase), 
contextManager.getInstanceContext()));
     }
     
     /**
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
index 490463a275f..3648d1079e2 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
@@ -44,8 +44,9 @@ import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.
 import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
 import org.apache.shardingsphere.infra.state.StateType;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
@@ -278,15 +279,15 @@ public final class ClusterContextManagerCoordinatorTest {
     @Test
     public void assertRenewPrimaryDataSourceName() {
         Collection<ShardingSphereRule> rules = new LinkedList<>();
-        StatusContainedRule mockStatusContainedRule = 
mock(StatusContainedRule.class);
-        rules.add(mockStatusContainedRule);
+        RestartHeartBeatJobRule mockRestartHeartBeatJobRule = 
mock(RestartHeartBeatJobRule.class);
+        rules.add(mockRestartHeartBeatJobRule);
         ShardingSphereRuleMetaData ruleMetaData = new 
ShardingSphereRuleMetaData(rules);
         ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
         when(database.getRuleMetaData()).thenReturn(ruleMetaData);
         
contextManager.getMetaDataContexts().getMetaData().getDatabases().put("db", 
database);
         PrimaryStateChangedEvent mockPrimaryStateChangedEvent = new 
PrimaryStateChangedEvent(new QualifiedDatabase("db.readwrite_ds.test_ds"));
         coordinator.renew(mockPrimaryStateChangedEvent);
-        verify(mockStatusContainedRule).updateStatus(any());
+        verify(mockRestartHeartBeatJobRule).restart(any(), any());
     }
     
     @Test

Reply via email to