This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 56c0bd30b88 Refactor StorageUnitChangedHandler and
StorageNodeChangedHandler (#34885)
56c0bd30b88 is described below
commit 56c0bd30b888290170a0874820c0c946be5a8b0f
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Mar 5 19:08:21 2025 +0800
Refactor StorageUnitChangedHandler and StorageNodeChangedHandler (#34885)
* Refactor StorageUnitChangedHandler and StorageNodeChangedHandler
* Refactor StorageUnitChangedHandler and StorageNodeChangedHandler
* Refactor StorageUnitChangedHandler and StorageNodeChangedHandler
* Refactor StorageUnitChangedHandler and StorageNodeChangedHandler
---
.../metadata/manager/ActiveVersionChecker.java | 64 +++++++++++++
.../manager/rule/DatabaseRuleItemManager.java | 13 +--
.../type/metadata/storage/StorageNodeNodePath.java | 2 +-
.../type/metadata/storage/StorageUnitNodePath.java | 2 +-
.../dispatch/checker/ActiveVersionChecker.java | 42 ---------
.../datasource/DataSourceChangedHandler.java | 102 ---------------------
.../datasource/StorageNodeChangedHandler.java | 76 +++++++++++++++
.../datasource/StorageUnitChangedHandler.java | 89 ++++++++++++++++++
.../datasource/type/StorageNodeChangedHandler.java | 63 -------------
.../datasource/type/StorageUnitChangedHandler.java | 73 ---------------
.../metadata/type/TableChangedHandler.java | 6 +-
.../database/metadata/type/ViewChangedHandler.java | 6 +-
.../global/type/GlobalRuleChangedHandler.java | 6 +-
.../global/type/PropertiesChangedHandler.java | 6 +-
.../type/DatabaseMetaDataChangedListener.java | 9 +-
...est.java => StorageUnitChangedHandlerTest.java} | 11 +--
16 files changed, 260 insertions(+), 310 deletions(-)
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/manager/ActiveVersionChecker.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/manager/ActiveVersionChecker.java
new file mode 100644
index 00000000000..2f4157be15e
--- /dev/null
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/manager/ActiveVersionChecker.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.metadata.manager;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.mode.event.DataChangedEvent;
+import org.apache.shardingsphere.mode.node.path.type.version.VersionNodePath;
+import org.apache.shardingsphere.mode.spi.repository.PersistRepository;
+
+/**
+ * Active version checker.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Slf4j
+public final class ActiveVersionChecker {
+
+ /**
+ * Check whether the current version same with an active version.
+ *
+ * @param repository repository
+ * @param event data changed event
+ * @return same or not
+ */
+ public static boolean checkSame(final PersistRepository repository, final
DataChangedEvent event) {
+ return checkSame(repository, event.getValue(), event.getKey());
+ }
+
+ /**
+ * Check whether the current version same with an active version.
+ *
+ * @param repository repository
+ * @param versionNodePath version node path
+ * @param currentVersion current version
+ * @return same or not
+ */
+ public static boolean checkSame(final PersistRepository repository, final
VersionNodePath versionNodePath, final int currentVersion) {
+ return checkSame(repository, String.valueOf(currentVersion),
versionNodePath.getActiveVersionPath());
+ }
+
+ private static boolean checkSame(final PersistRepository repository, final
String currentVersion, final String activeVersionPath) {
+ if (currentVersion.equals(repository.query(activeVersionPath))) {
+ return true;
+ }
+ log.warn("Invalid active version `{}` of key `{}`", currentVersion,
activeVersionPath);
+ return false;
+ }
+}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/manager/rule/DatabaseRuleItemManager.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/manager/rule/DatabaseRuleItemManager.java
index bee609080ca..56c7e4b6971 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/manager/rule/DatabaseRuleItemManager.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/manager/rule/DatabaseRuleItemManager.java
@@ -25,11 +25,12 @@ import
org.apache.shardingsphere.infra.config.rule.scope.DatabaseRuleConfigurati
import
org.apache.shardingsphere.infra.config.rule.scope.DatabaseRuleConfigurationEmptyChecker;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.mode.metadata.manager.ActiveVersionChecker;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistFacade;
import
org.apache.shardingsphere.mode.node.path.type.metadata.rule.DatabaseRuleNodePath;
import org.apache.shardingsphere.mode.node.path.type.version.VersionNodePath;
-import
org.apache.shardingsphere.mode.spi.rule.RuleItemConfigurationChangedProcessor;
import org.apache.shardingsphere.mode.spi.rule.RuleChangedItemType;
+import
org.apache.shardingsphere.mode.spi.rule.RuleItemConfigurationChangedProcessor;
import java.sql.SQLException;
@@ -56,7 +57,7 @@ public final class DatabaseRuleItemManager {
@SuppressWarnings({"rawtypes", "unchecked"})
public void alter(final DatabaseRuleNodePath databaseRuleNodePath, final
int currentVersion) throws SQLException {
VersionNodePath versionNodePath = new
VersionNodePath(databaseRuleNodePath);
- if (!checkActiveVersion(versionNodePath, currentVersion)) {
+ if
(!ActiveVersionChecker.checkSame(metaDataPersistFacade.getRepository(),
versionNodePath, currentVersion)) {
return;
}
RuleItemConfigurationChangedProcessor processor =
TypedSPILoader.getService(RuleItemConfigurationChangedProcessor.class,
@@ -71,14 +72,6 @@ public final class DatabaseRuleItemManager {
}
}
- private boolean checkActiveVersion(final VersionNodePath versionNodePath,
final int currentVersion) {
- if
(String.valueOf(currentVersion).equals(metaDataPersistFacade.getRepository().query(versionNodePath.getActiveVersionPath())))
{
- return true;
- }
- log.warn("Invalid active version `{}` of key `{}`", currentVersion,
versionNodePath.getActiveVersionPath());
- return false;
- }
-
/**
* Drop rule item.
*
diff --git
a/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageNodeNodePath.java
b/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageNodeNodePath.java
index 4738f852ab7..8f379351b29 100644
---
a/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageNodeNodePath.java
+++
b/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageNodeNodePath.java
@@ -46,6 +46,6 @@ public final class StorageNodeNodePath implements NodePath {
* @return created search criteria
*/
public static NodePathSearchCriteria createStorageNodeSearchCriteria() {
- return new NodePathSearchCriteria(new StorageNodeNodePath(), false,
false, 2);
+ return new NodePathSearchCriteria(new StorageNodeNodePath(), false,
true, 2);
}
}
diff --git
a/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageUnitNodePath.java
b/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageUnitNodePath.java
index 7badcc4714a..b5b1466b441 100644
---
a/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageUnitNodePath.java
+++
b/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageUnitNodePath.java
@@ -56,6 +56,6 @@ public final class StorageUnitNodePath implements NodePath {
* @return created search criteria
*/
public static NodePathSearchCriteria createStorageUnitSearchCriteria() {
- return new NodePathSearchCriteria(new StorageUnitNodePath(), false,
false, 2);
+ return new NodePathSearchCriteria(new StorageUnitNodePath(), false,
true, 2);
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/checker/ActiveVersionChecker.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/checker/ActiveVersionChecker.java
deleted file mode 100644
index 8df8f227773..00000000000
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/checker/ActiveVersionChecker.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.dispatch.checker;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import org.apache.shardingsphere.mode.event.DataChangedEvent;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-
-/**
- * Active version checker.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ActiveVersionChecker {
-
- /**
- * Check active version.
- *
- * @param contextManager context manager
- * @param event data changed event
- */
- public static void checkActiveVersion(final ContextManager contextManager,
final DataChangedEvent event) {
-
ShardingSpherePreconditions.checkState(event.getValue().equals(contextManager.getPersistServiceFacade().getRepository().query(event.getKey())),
- () -> new IllegalStateException(String.format("Invalid active
version: %s of key: %s", event.getValue(), event.getKey())));
- }
-}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/DataSourceChangedHandler.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/DataSourceChangedHandler.java
deleted file mode 100644
index 3bd16b27d12..00000000000
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/DataSourceChangedHandler.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource;
-
-import org.apache.shardingsphere.mode.event.DataChangedEvent;
-import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.DatabaseChangedHandler;
-import
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource.type.StorageNodeChangedHandler;
-import
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource.type.StorageUnitChangedHandler;
-import
org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathSearcher;
-import
org.apache.shardingsphere.mode.node.path.type.metadata.storage.StorageNodeNodePath;
-import
org.apache.shardingsphere.mode.node.path.type.metadata.storage.StorageUnitNodePath;
-import
org.apache.shardingsphere.mode.node.path.type.version.VersionNodePathParser;
-
-import java.util.Optional;
-
-/**
- * Data source changed handler.
- */
-public final class DataSourceChangedHandler implements DatabaseChangedHandler {
-
- private final StorageUnitChangedHandler storageUnitChangedHandler;
-
- private final StorageNodeChangedHandler storageNodeChangedHandler;
-
- public DataSourceChangedHandler(final ContextManager contextManager) {
- storageUnitChangedHandler = new
StorageUnitChangedHandler(contextManager);
- storageNodeChangedHandler = new
StorageNodeChangedHandler(contextManager);
- }
-
- @Override
- public boolean isSubscribed(final String databaseName, final
DataChangedEvent event) {
- return NodePathSearcher.isMatchedPath(event.getKey(),
StorageUnitNodePath.createDataSourceSearchCriteria(databaseName));
- }
-
- @Override
- public void handle(final String databaseName, final DataChangedEvent
event) {
- Optional<String> storageUnitName = new VersionNodePathParser(new
StorageUnitNodePath()).findIdentifierByActiveVersionPath(event.getKey(), 2);
- boolean isActiveVersion = true;
- if (!storageUnitName.isPresent()) {
- storageUnitName = NodePathSearcher.find(event.getKey(),
StorageUnitNodePath.createStorageUnitSearchCriteria());
- isActiveVersion = false;
- }
- if (storageUnitName.isPresent()) {
- handleStorageUnitChanged(databaseName, event,
storageUnitName.get(), isActiveVersion);
- return;
- }
- Optional<String> storageNodeName = new VersionNodePathParser(new
StorageNodeNodePath()).findIdentifierByActiveVersionPath(event.getKey(), 2);
- isActiveVersion = true;
- if (!storageNodeName.isPresent()) {
- storageNodeName = NodePathSearcher.find(event.getKey(),
StorageNodeNodePath.createStorageNodeSearchCriteria());
- isActiveVersion = false;
- }
- if (storageNodeName.isPresent()) {
- handleStorageNodeChanged(databaseName, event,
storageNodeName.get(), isActiveVersion);
- }
- }
-
- private void handleStorageUnitChanged(final String databaseName, final
DataChangedEvent event, final String storageUnitName, final boolean
isActiveVersion) {
- if (isActiveVersion) {
- if (Type.ADDED == event.getType()) {
- storageUnitChangedHandler.handleRegistered(databaseName,
storageUnitName, event);
- } else if (Type.UPDATED == event.getType()) {
- storageUnitChangedHandler.handleAltered(databaseName,
storageUnitName, event);
- }
- return;
- }
- if (Type.DELETED == event.getType()) {
- storageUnitChangedHandler.handleUnregistered(databaseName,
storageUnitName);
- }
- }
-
- private void handleStorageNodeChanged(final String databaseName, final
DataChangedEvent event, final String storageNodeName, final boolean
isActiveVersion) {
- if (isActiveVersion) {
- if (Type.ADDED == event.getType()) {
- storageNodeChangedHandler.handleRegistered(databaseName,
storageNodeName, event);
- } else if (Type.UPDATED == event.getType()) {
- storageNodeChangedHandler.handleAltered(databaseName,
storageNodeName, event);
- }
- return;
- }
- if (Type.DELETED == event.getType()) {
- storageNodeChangedHandler.handleUnregistered(databaseName,
storageNodeName);
- }
- }
-}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageNodeChangedHandler.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageNodeChangedHandler.java
new file mode 100644
index 00000000000..ea5d96fe644
--- /dev/null
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageNodeChangedHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.DataChangedEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.DatabaseChangedHandler;
+import
org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathPattern;
+import
org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathSearcher;
+import
org.apache.shardingsphere.mode.node.path.type.metadata.storage.StorageNodeNodePath;
+import
org.apache.shardingsphere.mode.node.path.type.version.VersionNodePathParser;
+
+import java.util.Optional;
+
+/**
+ * Storage node changed handler.
+ */
+@RequiredArgsConstructor
+public final class StorageNodeChangedHandler implements DatabaseChangedHandler
{
+
+ private final ContextManager contextManager;
+
+ @Override
+ public boolean isSubscribed(final String databaseName, final
DataChangedEvent event) {
+ return new VersionNodePathParser(new StorageNodeNodePath(databaseName,
NodePathPattern.IDENTIFIER)).isActiveVersionPath(event.getKey());
+ }
+
+ @Override
+ public void handle(final String databaseName, final DataChangedEvent
event) {
+ Optional<String> storageNodeName =
NodePathSearcher.find(event.getKey(),
StorageNodeNodePath.createStorageNodeSearchCriteria());
+ if (!storageNodeName.isPresent()) {
+ return;
+ }
+ switch (event.getType()) {
+ case ADDED:
+ handleRegistered(databaseName, storageNodeName.get(), event);
+ break;
+ case UPDATED:
+ handleAltered(databaseName, storageNodeName.get(), event);
+ break;
+ case DELETED:
+ handleUnregistered(databaseName, storageNodeName.get());
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void handleRegistered(final String databaseName, final String
storageNodeName, final DataChangedEvent event) {
+ // TODO
+ }
+
+ private void handleAltered(final String databaseName, final String
storageNodeName, final DataChangedEvent event) {
+ // TODO
+ }
+
+ private void handleUnregistered(final String databaseName, final String
storageNodeName) {
+ // TODO
+ }
+}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageUnitChangedHandler.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageUnitChangedHandler.java
new file mode 100644
index 00000000000..f14b02e98fd
--- /dev/null
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageUnitChangedHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource;
+
+import com.google.common.base.Preconditions;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
+import org.apache.shardingsphere.mode.event.DataChangedEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.metadata.manager.ActiveVersionChecker;
+import
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.DatabaseChangedHandler;
+import
org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathPattern;
+import
org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathSearcher;
+import
org.apache.shardingsphere.mode.node.path.type.metadata.storage.StorageUnitNodePath;
+import
org.apache.shardingsphere.mode.node.path.type.version.VersionNodePathParser;
+
+import java.util.Collections;
+import java.util.Optional;
+
+/**
+ * Storage unit changed handler.
+ */
+@RequiredArgsConstructor
+public final class StorageUnitChangedHandler implements DatabaseChangedHandler
{
+
+ private final ContextManager contextManager;
+
+ @Override
+ public boolean isSubscribed(final String databaseName, final
DataChangedEvent event) {
+ return new VersionNodePathParser(new StorageUnitNodePath(databaseName,
NodePathPattern.IDENTIFIER)).isActiveVersionPath(event.getKey());
+ }
+
+ @Override
+ public void handle(final String databaseName, final DataChangedEvent
event) {
+ Optional<String> storageUnitName =
NodePathSearcher.find(event.getKey(),
StorageUnitNodePath.createStorageUnitSearchCriteria());
+ if (!storageUnitName.isPresent()) {
+ return;
+ }
+ switch (event.getType()) {
+ case ADDED:
+ handleRegistered(databaseName, storageUnitName.get(), event);
+ break;
+ case UPDATED:
+ handleAltered(databaseName, storageUnitName.get(), event);
+ break;
+ case DELETED:
+ handleUnregistered(databaseName, storageUnitName.get());
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void handleRegistered(final String databaseName, final String
storageUnitName, final DataChangedEvent event) {
+ if
(!ActiveVersionChecker.checkSame(contextManager.getPersistServiceFacade().getRepository(),
event)) {
+ return;
+ }
+ DataSourcePoolProperties dataSourcePoolProps =
contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDataSourceUnitService().load(databaseName,
storageUnitName);
+
contextManager.getMetaDataContextManager().getStorageUnitManager().register(databaseName,
Collections.singletonMap(storageUnitName, dataSourcePoolProps));
+ }
+
+ private void handleAltered(final String databaseName, final String
storageUnitName, final DataChangedEvent event) {
+ if
(!ActiveVersionChecker.checkSame(contextManager.getPersistServiceFacade().getRepository(),
event)) {
+ return;
+ }
+ DataSourcePoolProperties dataSourcePoolProps =
contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDataSourceUnitService().load(databaseName,
storageUnitName);
+
contextManager.getMetaDataContextManager().getStorageUnitManager().alter(databaseName,
Collections.singletonMap(storageUnitName, dataSourcePoolProps));
+ }
+
+ private void handleUnregistered(final String databaseName, final String
storageUnitName) {
+
Preconditions.checkState(contextManager.getMetaDataContexts().getMetaData().containsDatabase(databaseName),
"No database '%s' exists.", databaseName);
+
contextManager.getMetaDataContextManager().getStorageUnitManager().unregister(databaseName,
storageUnitName);
+ }
+}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/type/StorageNodeChangedHandler.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/type/StorageNodeChangedHandler.java
deleted file mode 100644
index a49d48f882e..00000000000
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/type/StorageNodeChangedHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource.type;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.event.DataChangedEvent;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-
-/**
- * Storage node changed handler.
- */
-@RequiredArgsConstructor
-public final class StorageNodeChangedHandler {
-
- private final ContextManager contextManager;
-
- /**
- * Handle storage node registered.
- *
- * @param databaseName database name
- * @param dataSourceUnitName data source unit name
- * @param event data changed event
- */
- public void handleRegistered(final String databaseName, final String
dataSourceUnitName, final DataChangedEvent event) {
- // TODO
- }
-
- /**
- * Handle storage node altered.
- *
- * @param databaseName database name
- * @param dataSourceUnitName data source unit name
- * @param event data changed event
- */
- public void handleAltered(final String databaseName, final String
dataSourceUnitName, final DataChangedEvent event) {
- // TODO
- }
-
- /**
- * Handle storage node unregistered.
- *
- * @param databaseName database name
- * @param dataSourceUnitName data source unit name
- */
- public void handleUnregistered(final String databaseName, final String
dataSourceUnitName) {
- // TODO
- }
-}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/type/StorageUnitChangedHandler.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/type/StorageUnitChangedHandler.java
deleted file mode 100644
index 7c48981db1d..00000000000
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/type/StorageUnitChangedHandler.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource.type;
-
-import com.google.common.base.Preconditions;
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
-import org.apache.shardingsphere.mode.event.DataChangedEvent;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.dispatch.checker.ActiveVersionChecker;
-
-import java.util.Collections;
-
-/**
- * Storage unit changed handler.
- */
-@RequiredArgsConstructor
-public final class StorageUnitChangedHandler {
-
- private final ContextManager contextManager;
-
- /**
- * Handle storage unit registered.
- *
- * @param databaseName database name
- * @param dataSourceUnitName data source unit name
- * @param event data changed event
- */
- public void handleRegistered(final String databaseName, final String
dataSourceUnitName, final DataChangedEvent event) {
- ActiveVersionChecker.checkActiveVersion(contextManager, event);
- DataSourcePoolProperties dataSourcePoolProps =
contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDataSourceUnitService().load(databaseName,
dataSourceUnitName);
-
contextManager.getMetaDataContextManager().getStorageUnitManager().register(databaseName,
Collections.singletonMap(dataSourceUnitName, dataSourcePoolProps));
- }
-
- /**
- * Handle storage unit altered.
- *
- * @param databaseName database name
- * @param dataSourceUnitName data source unit name
- * @param event data changed event
- */
- public void handleAltered(final String databaseName, final String
dataSourceUnitName, final DataChangedEvent event) {
- ActiveVersionChecker.checkActiveVersion(contextManager, event);
- DataSourcePoolProperties dataSourcePoolProps =
contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDataSourceUnitService().load(databaseName,
dataSourceUnitName);
-
contextManager.getMetaDataContextManager().getStorageUnitManager().alter(databaseName,
Collections.singletonMap(dataSourceUnitName, dataSourcePoolProps));
- }
-
- /**
- * Handle storage unit unregistered.
- *
- * @param databaseName database name
- * @param dataSourceUnitName data source unit name
- */
- public void handleUnregistered(final String databaseName, final String
dataSourceUnitName) {
-
Preconditions.checkState(contextManager.getMetaDataContexts().getMetaData().containsDatabase(databaseName),
"No database '%s' exists.", databaseName);
-
contextManager.getMetaDataContextManager().getStorageUnitManager().unregister(databaseName,
dataSourceUnitName);
- }
-}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/TableChangedHandler.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/TableChangedHandler.java
index e1ea166f335..655558fcc33 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/TableChangedHandler.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/TableChangedHandler.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.dispatch.checker.ActiveVersionChecker;
+import org.apache.shardingsphere.mode.metadata.manager.ActiveVersionChecker;
import
org.apache.shardingsphere.mode.metadata.refresher.statistics.StatisticsRefreshEngine;
import
org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathSearcher;
import
org.apache.shardingsphere.mode.node.path.type.metadata.database.TableMetadataNodePath;
@@ -50,7 +50,9 @@ public final class TableChangedHandler {
public void handleCreatedOrAltered(final String databaseName, final String
schemaName, final DataChangedEvent event) {
String tableName = new VersionNodePathParser(new
TableMetadataNodePath())
.findIdentifierByActiveVersionPath(event.getKey(),
3).orElseThrow(() -> new IllegalStateException("Table name not found."));
- ActiveVersionChecker.checkActiveVersion(contextManager, event);
+ if
(!ActiveVersionChecker.checkSame(contextManager.getPersistServiceFacade().getRepository(),
event)) {
+ return;
+ }
ShardingSphereTable table =
contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDatabaseMetaDataFacade().getTable().load(databaseName,
schemaName, tableName);
contextManager.getMetaDataContextManager().getDatabaseMetaDataManager().alterTable(databaseName,
schemaName, table);
statisticsRefreshEngine.asyncRefresh();
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/ViewChangedHandler.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/ViewChangedHandler.java
index a4d57906a82..2f01727d455 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/ViewChangedHandler.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/ViewChangedHandler.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.dispatch.checker.ActiveVersionChecker;
+import org.apache.shardingsphere.mode.metadata.manager.ActiveVersionChecker;
import
org.apache.shardingsphere.mode.metadata.refresher.statistics.StatisticsRefreshEngine;
import
org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathSearcher;
import
org.apache.shardingsphere.mode.node.path.type.metadata.database.ViewMetadataNodePath;
@@ -50,7 +50,9 @@ public final class ViewChangedHandler {
public void handleCreatedOrAltered(final String databaseName, final String
schemaName, final DataChangedEvent event) {
String viewName = new VersionNodePathParser(new
ViewMetadataNodePath()).findIdentifierByActiveVersionPath(event.getKey(), 3)
.orElseThrow(() -> new IllegalStateException("View name not
found."));
- ActiveVersionChecker.checkActiveVersion(contextManager, event);
+ if
(!ActiveVersionChecker.checkSame(contextManager.getPersistServiceFacade().getRepository(),
event)) {
+ return;
+ }
ShardingSphereView view =
contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDatabaseMetaDataFacade().getView().load(databaseName,
schemaName, viewName);
contextManager.getMetaDataContextManager().getDatabaseMetaDataManager().alterView(databaseName,
schemaName, view);
statisticsRefreshEngine.asyncRefresh();
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/GlobalRuleChangedHandler.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/GlobalRuleChangedHandler.java
index c5da0c53035..8ae66290624 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/GlobalRuleChangedHandler.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/GlobalRuleChangedHandler.java
@@ -21,7 +21,7 @@ import
org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.dispatch.checker.ActiveVersionChecker;
+import org.apache.shardingsphere.mode.metadata.manager.ActiveVersionChecker;
import
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.global.GlobalDataChangedEventHandler;
import
org.apache.shardingsphere.mode.node.path.engine.generator.NodePathGenerator;
import
org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathPattern;
@@ -53,7 +53,9 @@ public final class GlobalRuleChangedHandler implements
GlobalDataChangedEventHan
if (!ruleType.isPresent()) {
return;
}
- ActiveVersionChecker.checkActiveVersion(contextManager, event);
+ if
(!ActiveVersionChecker.checkSame(contextManager.getPersistServiceFacade().getRepository(),
event)) {
+ return;
+ }
RuleConfiguration ruleConfig =
contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getGlobalRuleService().load(ruleType.get());
contextManager.getMetaDataContextManager().getGlobalConfigurationManager().alterGlobalRuleConfiguration(ruleConfig);
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/PropertiesChangedHandler.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/PropertiesChangedHandler.java
index 36b412dd716..3d714800ac0 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/PropertiesChangedHandler.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/PropertiesChangedHandler.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.global.t
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.dispatch.checker.ActiveVersionChecker;
+import org.apache.shardingsphere.mode.metadata.manager.ActiveVersionChecker;
import
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.global.GlobalDataChangedEventHandler;
import
org.apache.shardingsphere.mode.node.path.engine.generator.NodePathGenerator;
import
org.apache.shardingsphere.mode.node.path.type.global.GlobalPropertiesNodePath;
@@ -49,7 +49,9 @@ public final class PropertiesChangedHandler implements
GlobalDataChangedEventHan
if (!new VersionNodePathParser(new
GlobalPropertiesNodePath()).isActiveVersionPath(event.getKey())) {
return;
}
- ActiveVersionChecker.checkActiveVersion(contextManager, event);
+ if
(!ActiveVersionChecker.checkSame(contextManager.getPersistServiceFacade().getRepository(),
event)) {
+ return;
+ }
contextManager.getMetaDataContextManager().getGlobalConfigurationManager().alterProperties(contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getPropsService().load());
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/listener/type/DatabaseMetaDataChangedListener.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/listener/type/DatabaseMetaDataChangedListener.java
index 6b937dfece0..24eda761ee6 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/listener/type/DatabaseMetaDataChangedListener.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/listener/type/DatabaseMetaDataChangedListener.java
@@ -22,7 +22,8 @@ import
org.apache.shardingsphere.infra.spi.type.ordered.cache.OrderedServicesCac
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.DatabaseChangedHandler;
-import
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource.DataSourceChangedHandler;
+import
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource.StorageNodeChangedHandler;
+import
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource.StorageUnitChangedHandler;
import
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.metadata.MetaDataChangedHandler;
import
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.rule.RuleConfigurationChangedHandler;
import
org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathSearcher;
@@ -42,7 +43,11 @@ public final class DatabaseMetaDataChangedListener
implements DataChangedEventLi
private final Collection<DatabaseChangedHandler> handlers;
public DatabaseMetaDataChangedListener(final ContextManager
contextManager) {
- handlers = Arrays.asList(new MetaDataChangedHandler(contextManager),
new DataSourceChangedHandler(contextManager), new
RuleConfigurationChangedHandler(contextManager));
+ handlers = Arrays.asList(
+ new MetaDataChangedHandler(contextManager),
+ new StorageUnitChangedHandler(contextManager),
+ new StorageNodeChangedHandler(contextManager),
+ new RuleConfigurationChangedHandler(contextManager));
}
@Override
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/DataSourceChangedHandlerTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageUnitChangedHandlerTest.java
similarity index 82%
rename from
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/DataSourceChangedHandlerTest.java
rename to
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageUnitChangedHandlerTest.java
index 2b489577685..fce68712323 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/DataSourceChangedHandlerTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageUnitChangedHandlerTest.java
@@ -17,7 +17,6 @@
package
org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource;
-import
org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
@@ -39,24 +38,21 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
-class DataSourceChangedHandlerTest {
+class StorageUnitChangedHandlerTest {
- private DataSourceChangedHandler handler;
+ private StorageUnitChangedHandler handler;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ContextManager contextManager;
@BeforeEach
void setUp() {
-
when(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps().getValue(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED)).thenReturn(false);
-
when(contextManager.getComputeNodeInstanceContext().getModeConfiguration().isCluster()).thenReturn(true);
when(contextManager.getPersistServiceFacade().getRepository().query(any())).thenReturn("0");
- handler = new DataSourceChangedHandler(contextManager);
+ handler = new StorageUnitChangedHandler(contextManager);
}
@Test
void assertHandleStorageUnitRegistered() {
-
when(contextManager.getPersistServiceFacade().getRepository().query("key")).thenReturn("value");
when(contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDataSourceUnitService().load("foo_db",
"foo_unit")).thenReturn(mock(DataSourcePoolProperties.class));
handler.handle("foo_db", new
DataChangedEvent("/metadata/foo_db/data_sources/units/foo_unit/active_version",
"0", Type.ADDED));
verify(contextManager.getMetaDataContextManager().getStorageUnitManager()).register(eq("foo_db"),
any());
@@ -64,7 +60,6 @@ class DataSourceChangedHandlerTest {
@Test
void assertHandleStorageUnitAltered() {
-
when(contextManager.getPersistServiceFacade().getRepository().query("key")).thenReturn("value");
when(contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDataSourceUnitService().load("foo_db",
"foo_unit")).thenReturn(mock(DataSourcePoolProperties.class));
handler.handle("foo_db", new
DataChangedEvent("/metadata/foo_db/data_sources/units/foo_unit/active_version",
"0", Type.UPDATED));
verify(contextManager.getMetaDataContextManager().getStorageUnitManager()).alter(eq("foo_db"),
any());