This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d39b987c308 Split status contain rule and support multiple groups of
high availability (#19005)
d39b987c308 is described below
commit d39b987c308d44989238ec3e50a083c83a41363b
Author: zhaojinchao <[email protected]>
AuthorDate: Sun Jul 10 23:41:50 2022 +0800
Split status contain rule and support multiple groups of high availability
(#19005)
* Split readwrite and discovery update status event and support multiple
groups of high availability
* Fix unit test
* Use Entry instead of Map.Entry
---
.../rule/DatabaseDiscoveryDataSourceRule.java | 18 +++++++++++++
.../dbdiscovery/rule/DatabaseDiscoveryRule.java | 23 ++++++++--------
.../rule/DatabaseDiscoveryRuleTest.java | 17 ++++++------
.../rule/ReadwriteSplittingRule.java | 15 ++++++-----
.../rule/ReadwriteSplittingRuleTest.java | 8 +++---
...obRule.java => DynamicStatusContainedRule.java} | 13 ++++++---
...nedRule.java => StaticStatusContainedRule.java} | 4 +--
.../ClusterContextManagerCoordinator.java | 31 ++++++++++++++--------
.../ClusterContextManagerCoordinatorTest.java | 10 +++----
9 files changed, 87 insertions(+), 52 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryDataSourceRule.java
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryDataSourceRule.java
index 162dec1ac8b..49b7e8bb82c 100644
---
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryDataSourceRule.java
+++
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryDataSourceRule.java
@@ -23,12 +23,14 @@ import lombok.Getter;
import
org.apache.shardingsphere.dbdiscovery.api.config.rule.DatabaseDiscoveryDataSourceRuleConfiguration;
import
org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm;
+import javax.sql.DataSource;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.stream.Collectors;
@@ -100,6 +102,22 @@ public final class DatabaseDiscoveryDataSourceRule {
this.primaryDataSourceName = primaryDataSourceName;
}
+ /**
+ * Get data source.
+ *
+ * @param dataSourceMap data source map
+ * @return data source
+ */
+ public Map<String, DataSource> getDataSourceGroup(final Map<String,
DataSource> dataSourceMap) {
+ Map<String, DataSource> result = new HashMap<>(dataSourceMap.size(),
1);
+ for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
+ if (dataSourceNames.contains(entry.getKey())) {
+ result.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return result;
+ }
+
/**
* Get data source mapper.
*
diff --git
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index 52b29a6350f..7eae85e961d 100644
---
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -37,8 +37,7 @@ import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabas
import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
import org.apache.shardingsphere.infra.rule.identifier.scope.DatabaseRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedRule;
-import
org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
-import
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import
org.apache.shardingsphere.infra.rule.identifier.type.DynamicStatusContainedRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.exportable.ExportableRule;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
import
org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
@@ -60,7 +59,7 @@ import java.util.Properties;
/**
* Database discovery rule.
*/
-public final class DatabaseDiscoveryRule implements DatabaseRule,
DataSourceContainedRule, RestartHeartBeatJobRule, StatusContainedRule,
ExportableRule {
+public final class DatabaseDiscoveryRule implements DatabaseRule,
DataSourceContainedRule, DynamicStatusContainedRule, ExportableRule {
@Getter
private final RuleConfiguration configuration;
@@ -117,8 +116,8 @@ public final class DatabaseDiscoveryRule implements
DatabaseRule, DataSourceCont
for (Entry<String, DatabaseDiscoveryDataSourceRule> entry :
dataSourceRules.entrySet()) {
String groupName = entry.getKey();
DatabaseDiscoveryDataSourceRule dataSourceRule = entry.getValue();
+ Map<String, DataSource> originalDataSourceMap =
dataSourceRule.getDataSourceGroup(dataSourceMap);
DatabaseDiscoveryEngine engine = new
DatabaseDiscoveryEngine(dataSourceRule.getDatabaseDiscoveryProviderAlgorithm());
- Map<String, DataSource> originalDataSourceMap = new
HashMap<>(dataSourceMap);
engine.checkEnvironment(databaseName, originalDataSourceMap);
dataSourceRule.changePrimaryDataSourceName(engine.changePrimaryDataSource(
databaseName, groupName,
entry.getValue().getPrimaryDataSourceName(), originalDataSourceMap,
dataSourceRule.getDisabledDataSourceNames()));
@@ -154,7 +153,7 @@ public final class DatabaseDiscoveryRule implements
DatabaseRule, DataSourceCont
}
@Override
- public void restart(final DataSourceStatusChangedEvent event, final
InstanceContext instanceContext) {
+ public void restartHeartBeatJob(final DataSourceStatusChangedEvent event,
final InstanceContext instanceContext) {
PrimaryDataSourceChangedEvent dataSourceEvent =
(PrimaryDataSourceChangedEvent) event;
QualifiedDatabase qualifiedDatabase =
dataSourceEvent.getQualifiedDatabase();
DatabaseDiscoveryDataSourceRule dataSourceRule =
dataSourceRules.get(qualifiedDatabase.getGroupName());
@@ -174,7 +173,7 @@ public final class DatabaseDiscoveryRule implements
DatabaseRule, DataSourceCont
for (Entry<String, DatabaseDiscoveryDataSourceRule> entry :
dataSourceRules.entrySet()) {
DatabaseDiscoveryDataSourceRule rule = entry.getValue();
String jobName =
rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName +
"-" + rule.getGroupName();
- CronJob job = new CronJob(jobName, each -> new
HeartbeatJob(databaseName, rule.getGroupName(),
rule.getPrimaryDataSourceName(), dataSourceMap,
+ CronJob job = new CronJob(jobName, each -> new
HeartbeatJob(databaseName, rule.getGroupName(),
rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),
rule.getDatabaseDiscoveryProviderAlgorithm(),
rule.getDisabledDataSourceNames()).execute(null),
rule.getHeartbeatProps().getProperty("keep-alive-cron"));
modeScheduleContext.get().startCronJob(job);
}
@@ -184,12 +183,12 @@ public final class DatabaseDiscoveryRule implements
DatabaseRule, DataSourceCont
@Override
public void updateStatus(final DataSourceStatusChangedEvent event) {
StorageNodeDataSourceChangedEvent dataSourceChangedEvent =
(StorageNodeDataSourceChangedEvent) event;
- for (Entry<String, DatabaseDiscoveryDataSourceRule> entry :
dataSourceRules.entrySet()) {
- if
(StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()))
{
-
entry.getValue().disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
- } else {
-
entry.getValue().enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
- }
+ DatabaseDiscoveryDataSourceRule dataSourceRule =
dataSourceRules.get(dataSourceChangedEvent.getQualifiedDatabase().getGroupName());
+ Preconditions.checkState(null != dataSourceRule, "Can 't find database
discovery data source rule in database `%s`.", databaseName);
+ if
(StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()))
{
+
dataSourceRule.disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
+ } else {
+
dataSourceRule.enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
}
}
diff --git
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java
index deb7eb92d18..8761503c605 100644
---
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java
+++
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java
@@ -44,11 +44,11 @@ import static org.mockito.Mockito.when;
public final class DatabaseDiscoveryRuleTest {
- private final Map<String, DataSource> dataSourceMap =
Collections.singletonMap("primary", new MockedDataSource());
+ private final Map<String, DataSource> dataSourceMap =
Collections.singletonMap("primary_ds", new MockedDataSource());
@Test
public void assertFindDataSourceRule() {
- Optional<DatabaseDiscoveryDataSourceRule> actual =
createRule().findDataSourceRule("test_pr");
+ Optional<DatabaseDiscoveryDataSourceRule> actual =
createRule().findDataSourceRule("replica_ds");
assertTrue(actual.isPresent());
assertDataSourceRule(actual.get());
}
@@ -59,8 +59,8 @@ public final class DatabaseDiscoveryRuleTest {
}
private void assertDataSourceRule(final DatabaseDiscoveryDataSourceRule
actual) {
- assertThat(actual.getGroupName(), is("test_pr"));
- assertThat(actual.getDataSourceNames(), is(Arrays.asList("ds_0",
"ds_1")));
+ assertThat(actual.getGroupName(), is("replica_ds"));
+ assertThat(actual.getDataSourceNames(), is(Arrays.asList("primary_ds",
"replica_ds_0", "replica_ds_1")));
}
@Test
@@ -72,19 +72,20 @@ public final class DatabaseDiscoveryRuleTest {
}
private Map<String, Collection<String>> getDataSourceMapper() {
- Map<String, Collection<String>> result = new HashMap<>(2, 1);
- result.put("test_pr", Collections.singletonList("ds_1"));
+ Map<String, Collection<String>> result = new HashMap<>(1, 1);
+ result.put("replica_ds", Collections.singletonList("replica_ds_1"));
return result;
}
@Test
public void assertGetExportedMethods() {
DatabaseDiscoveryRule databaseDiscoveryRule = createRule();
-
assertThat(databaseDiscoveryRule.getExportData().get(ExportableConstants.EXPORT_DB_DISCOVERY_PRIMARY_DATA_SOURCES),
is(Collections.singletonMap("test_pr", "primary")));
+
assertThat(databaseDiscoveryRule.getExportData().get(ExportableConstants.EXPORT_DB_DISCOVERY_PRIMARY_DATA_SOURCES),
is(Collections.singletonMap("replica_ds", "primary_ds")));
}
private DatabaseDiscoveryRule createRule() {
- DatabaseDiscoveryDataSourceRuleConfiguration config = new
DatabaseDiscoveryDataSourceRuleConfiguration("test_pr", Arrays.asList("ds_0",
"ds_1"), "", "CORE.FIXTURE");
+ DatabaseDiscoveryDataSourceRuleConfiguration config =
+ new DatabaseDiscoveryDataSourceRuleConfiguration("replica_ds",
Arrays.asList("primary_ds", "replica_ds_0", "replica_ds_1"), "",
"CORE.FIXTURE");
InstanceContext instanceContext = mock(InstanceContext.class,
RETURNS_DEEP_STUBS);
when(instanceContext.getInstance().getCurrentInstanceId()).thenReturn("foo_id");
return new DatabaseDiscoveryRule("db_discovery", dataSourceMap, new
DatabaseDiscoveryRuleConfiguration(
diff --git
a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
index d6bc225026d..1562788b6e7 100644
---
a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
+++
b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
@@ -23,10 +23,11 @@ import lombok.Getter;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.distsql.constant.ExportableConstants;
import
org.apache.shardingsphere.infra.distsql.constant.ExportableItemConstants;
+import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
import org.apache.shardingsphere.infra.rule.identifier.scope.DatabaseRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedRule;
-import
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import
org.apache.shardingsphere.infra.rule.identifier.type.StaticStatusContainedRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.StorageConnectorReusableRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.exportable.ExportableRule;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
@@ -49,7 +50,7 @@ import java.util.Optional;
/**
* Readwrite-splitting rule.
*/
-public final class ReadwriteSplittingRule implements DatabaseRule,
DataSourceContainedRule, StatusContainedRule, ExportableRule,
StorageConnectorReusableRule {
+public final class ReadwriteSplittingRule implements DatabaseRule,
DataSourceContainedRule, StaticStatusContainedRule, ExportableRule,
StorageConnectorReusableRule {
@Getter
private final RuleConfiguration configuration;
@@ -116,11 +117,11 @@ public final class ReadwriteSplittingRule implements
DatabaseRule, DataSourceCon
@Override
public void updateStatus(final DataSourceStatusChangedEvent event) {
- for (Entry<String, ReadwriteSplittingDataSourceRule> entry :
dataSourceRules.entrySet()) {
- StorageNodeDataSourceChangedEvent dataSourceChangedEvent =
(StorageNodeDataSourceChangedEvent) event;
-
entry.getValue().updateDisabledDataSourceNames(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName(),
-
StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()));
- }
+ StorageNodeDataSourceChangedEvent dataSourceEvent =
(StorageNodeDataSourceChangedEvent) event;
+ QualifiedDatabase qualifiedDatabase =
dataSourceEvent.getQualifiedDatabase();
+ ReadwriteSplittingDataSourceRule dataSourceRule =
dataSourceRules.get(qualifiedDatabase.getGroupName());
+ Preconditions.checkState(null != dataSourceRule, "Can 't find
readwrite-splitting data source rule in database `%s`.",
qualifiedDatabase.getDatabaseName());
+
dataSourceRule.updateDisabledDataSourceNames(dataSourceEvent.getQualifiedDatabase().getDataSourceName(),
StorageNodeStatus.isDisable(dataSourceEvent.getDataSource().getStatus()));
}
@Override
diff --git
a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
index ce4f729fc4d..429e99486cd 100644
---
a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
+++
b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRuleTest.java
@@ -48,7 +48,7 @@ public final class ReadwriteSplittingRuleTest {
@Test
public void assertFindDataSourceRule() {
- Optional<ReadwriteSplittingDataSourceRule> actual =
createReadwriteSplittingRule().findDataSourceRule("test_pr");
+ Optional<ReadwriteSplittingDataSourceRule> actual =
createReadwriteSplittingRule().findDataSourceRule("readwrite");
assertTrue(actual.isPresent());
assertDataSourceRule(actual.get());
}
@@ -60,13 +60,13 @@ public final class ReadwriteSplittingRuleTest {
private ReadwriteSplittingRule createReadwriteSplittingRule() {
ReadwriteSplittingDataSourceRuleConfiguration config =
- new ReadwriteSplittingDataSourceRuleConfiguration("test_pr",
new StaticReadwriteSplittingStrategyConfiguration("write_ds",
Arrays.asList("read_ds_0", "read_ds_1")), null, "random");
+ new ReadwriteSplittingDataSourceRuleConfiguration("readwrite",
new StaticReadwriteSplittingStrategyConfiguration("write_ds",
Arrays.asList("read_ds_0", "read_ds_1")), null, "random");
return new ReadwriteSplittingRule(new
ReadwriteSplittingRuleConfiguration(
Collections.singleton(config),
Collections.singletonMap("random", new
ShardingSphereAlgorithmConfiguration("RANDOM", new Properties()))));
}
private void assertDataSourceRule(final ReadwriteSplittingDataSourceRule
actual) {
- assertThat(actual.getName(), is("test_pr"));
+ assertThat(actual.getName(), is("readwrite"));
assertThat(actual.getReadwriteSplittingStrategy().getWriteDataSource(),
is("write_ds"));
assertThat(actual.getReadwriteSplittingStrategy().getReadDataSources(),
is(Arrays.asList("read_ds_0", "read_ds_1")));
assertThat(actual.getLoadBalancer().getType(), is("RANDOM"));
@@ -103,7 +103,7 @@ public final class ReadwriteSplittingRuleTest {
public void assertGetDataSourceMapper() {
ReadwriteSplittingRule readwriteSplittingRule =
createReadwriteSplittingRule();
Map<String, Collection<String>> actual =
readwriteSplittingRule.getDataSourceMapper();
- Map<String, Collection<String>> expected =
Collections.singletonMap("test_pr", Arrays.asList("write_ds", "read_ds_0",
"read_ds_1"));
+ Map<String, Collection<String>> expected =
Collections.singletonMap("readwrite", Arrays.asList("write_ds", "read_ds_0",
"read_ds_1"));
assertThat(actual, is(expected));
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicStatusContainedRule.java
similarity index 76%
rename from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java
rename to
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicStatusContainedRule.java
index eb7eba3bc2d..6b9dac74120 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/RestartHeartBeatJobRule.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/DynamicStatusContainedRule.java
@@ -22,14 +22,21 @@ import
org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
/**
- * Restart heart beat job rule.
+ * Dynamic status contained rule.
*/
-public interface RestartHeartBeatJobRule extends ShardingSphereRule {
+public interface DynamicStatusContainedRule extends ShardingSphereRule {
+
+ /**
+ * Update data source status.
+ *
+ * @param event data source status changed event
+ */
+ void updateStatus(DataSourceStatusChangedEvent event);
/**
* Restart heart beat job.
* @param event data source status changed event
* @param instanceContext instance context
*/
- void restart(DataSourceStatusChangedEvent event, InstanceContext
instanceContext);
+ void restartHeartBeatJob(DataSourceStatusChangedEvent event,
InstanceContext instanceContext);
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StatusContainedRule.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticStatusContainedRule.java
similarity index 91%
rename from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StatusContainedRule.java
rename to
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticStatusContainedRule.java
index de538148cf1..37a73dabb70 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StatusContainedRule.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/StaticStatusContainedRule.java
@@ -21,9 +21,9 @@ import
org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
/**
- * Status contained rule.
+ * Static Status contained rule.
*/
-public interface StatusContainedRule extends ShardingSphereRule {
+public interface StaticStatusContainedRule extends ShardingSphereRule {
/**
* Update data source status.
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 61c8793c53e..c5b0dc73913 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -25,8 +25,9 @@ import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYaml
import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
-import
org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
-import
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import
org.apache.shardingsphere.infra.rule.identifier.type.StaticStatusContainedRule;
+import
org.apache.shardingsphere.infra.rule.identifier.type.DynamicStatusContainedRule;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
@@ -61,6 +62,7 @@ import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
/**
@@ -167,9 +169,16 @@ public final class ClusterContextManagerCoordinator {
@Subscribe
public synchronized void renew(final StorageNodeChangedEvent event) {
QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
-
contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
- .stream().filter(each -> each instanceof StatusContainedRule)
- .forEach(each -> ((StatusContainedRule) each).updateStatus(new
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
+ Optional<ShardingSphereRule> dynamicStatusContainedRule =
contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
+ .getRules().stream().filter(each -> each instanceof
DynamicStatusContainedRule).findFirst();
+ if (dynamicStatusContainedRule.isPresent()) {
+ ((DynamicStatusContainedRule)
dynamicStatusContainedRule.get()).updateStatus(new
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource()));
+ return;
+ }
+ Optional<ShardingSphereRule> staticStatusContainedRule =
contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
+ .getRules().stream().filter(each -> each instanceof
StaticStatusContainedRule).findFirst();
+ staticStatusContainedRule.ifPresent(shardingSphereRule ->
((StaticStatusContainedRule) shardingSphereRule)
+ .updateStatus(new
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
}
/**
@@ -182,9 +191,9 @@ public final class ClusterContextManagerCoordinator {
QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
contextManager.getMetaDataContexts().getMetaData().getDatabases().get(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
.stream()
- .filter(each -> each instanceof RestartHeartBeatJobRule)
- .forEach(each -> ((RestartHeartBeatJobRule) each)
- .restart(new
PrimaryDataSourceChangedEvent(qualifiedDatabase),
contextManager.getInstanceContext()));
+ .filter(each -> each instanceof DynamicStatusContainedRule)
+ .forEach(each -> ((DynamicStatusContainedRule) each)
+ .restartHeartBeatJob(new
PrimaryDataSourceChangedEvent(qualifiedDatabase),
contextManager.getInstanceContext()));
}
/**
@@ -295,13 +304,13 @@ public final class ClusterContextManagerCoordinator {
private void disableDataSources() {
contextManager.getMetaDataContexts().getMetaData().getDatabases().forEach((key,
value) -> value.getRuleMetaData().getRules().forEach(each -> {
- if (each instanceof StatusContainedRule) {
- disableDataSources((StatusContainedRule) each);
+ if (each instanceof StaticStatusContainedRule) {
+ disableDataSources((StaticStatusContainedRule) each);
}
}));
}
- private void disableDataSources(final StatusContainedRule rule) {
+ private void disableDataSources(final StaticStatusContainedRule rule) {
Map<String, StorageNodeDataSource> storageNodes =
registryCenter.getStorageNodeStatusService().loadStorageNodes();
Map<String, StorageNodeDataSource> disableDataSources =
storageNodes.entrySet().stream().filter(entry ->
StorageNodeStatus.isDisable(entry.getValue().getStatus()))
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
index 9e0b2cc0701..8314b6f4d69 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
@@ -43,9 +43,9 @@ import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.
import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import
org.apache.shardingsphere.infra.rule.identifier.type.StaticStatusContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
-import
org.apache.shardingsphere.infra.rule.identifier.type.RestartHeartBeatJobRule;
-import
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
+import
org.apache.shardingsphere.infra.rule.identifier.type.DynamicStatusContainedRule;
import org.apache.shardingsphere.infra.state.StateType;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
@@ -223,7 +223,7 @@ public final class ClusterContextManagerCoordinatorTest {
@Test
public void assertRenewForDisableStateChanged() {
- StatusContainedRule statusContainedRule =
mock(StatusContainedRule.class);
+ StaticStatusContainedRule statusContainedRule =
mock(StaticStatusContainedRule.class);
when(database.getRuleMetaData().getRules()).thenReturn(Collections.singletonList(statusContainedRule));
StorageNodeChangedEvent event = new StorageNodeChangedEvent(new
QualifiedDatabase("db.readwrite_ds.ds_0"), new
StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED));
coordinator.renew(event);
@@ -275,7 +275,7 @@ public final class ClusterContextManagerCoordinatorTest {
@Test
public void assertRenewPrimaryDataSourceName() {
Collection<ShardingSphereRule> rules = new LinkedList<>();
- RestartHeartBeatJobRule mockRestartHeartBeatJobRule =
mock(RestartHeartBeatJobRule.class);
+ DynamicStatusContainedRule mockRestartHeartBeatJobRule =
mock(DynamicStatusContainedRule.class);
rules.add(mockRestartHeartBeatJobRule);
ShardingSphereRuleMetaData ruleMetaData = new
ShardingSphereRuleMetaData(rules);
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
@@ -283,7 +283,7 @@ public final class ClusterContextManagerCoordinatorTest {
contextManager.getMetaDataContexts().getMetaData().getDatabases().put("db",
database);
PrimaryStateChangedEvent mockPrimaryStateChangedEvent = new
PrimaryStateChangedEvent(new QualifiedDatabase("db.readwrite_ds.test_ds"));
coordinator.renew(mockPrimaryStateChangedEvent);
- verify(mockRestartHeartBeatJobRule).restart(any(), any());
+ verify(mockRestartHeartBeatJobRule).restartHeartBeatJob(any(), any());
}
@Test