This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 56c0bd30b88 Refactor StorageUnitChangedHandler and StorageNodeChangedHandler (#34885) 56c0bd30b88 is described below commit 56c0bd30b888290170a0874820c0c946be5a8b0f Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Wed Mar 5 19:08:21 2025 +0800 Refactor StorageUnitChangedHandler and StorageNodeChangedHandler (#34885) * Refactor StorageUnitChangedHandler and StorageNodeChangedHandler * Refactor StorageUnitChangedHandler and StorageNodeChangedHandler * Refactor StorageUnitChangedHandler and StorageNodeChangedHandler * Refactor StorageUnitChangedHandler and StorageNodeChangedHandler --- .../metadata/manager/ActiveVersionChecker.java | 64 +++++++++++++ .../manager/rule/DatabaseRuleItemManager.java | 13 +-- .../type/metadata/storage/StorageNodeNodePath.java | 2 +- .../type/metadata/storage/StorageUnitNodePath.java | 2 +- .../dispatch/checker/ActiveVersionChecker.java | 42 --------- .../datasource/DataSourceChangedHandler.java | 102 --------------------- .../datasource/StorageNodeChangedHandler.java | 76 +++++++++++++++ .../datasource/StorageUnitChangedHandler.java | 89 ++++++++++++++++++ .../datasource/type/StorageNodeChangedHandler.java | 63 ------------- .../datasource/type/StorageUnitChangedHandler.java | 73 --------------- .../metadata/type/TableChangedHandler.java | 6 +- .../database/metadata/type/ViewChangedHandler.java | 6 +- .../global/type/GlobalRuleChangedHandler.java | 6 +- .../global/type/PropertiesChangedHandler.java | 6 +- .../type/DatabaseMetaDataChangedListener.java | 9 +- ...est.java => StorageUnitChangedHandlerTest.java} | 11 +-- 16 files changed, 260 insertions(+), 310 deletions(-) diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/manager/ActiveVersionChecker.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/manager/ActiveVersionChecker.java new file mode 100644 index 00000000000..2f4157be15e --- /dev/null +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/manager/ActiveVersionChecker.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.metadata.manager; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.node.path.type.version.VersionNodePath; +import org.apache.shardingsphere.mode.spi.repository.PersistRepository; + +/** + * Active version checker. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +@Slf4j +public final class ActiveVersionChecker { + + /** + * Check whether the current version same with an active version. + * + * @param repository repository + * @param event data changed event + * @return same or not + */ + public static boolean checkSame(final PersistRepository repository, final DataChangedEvent event) { + return checkSame(repository, event.getValue(), event.getKey()); + } + + /** + * Check whether the current version same with an active version. + * + * @param repository repository + * @param versionNodePath version node path + * @param currentVersion current version + * @return same or not + */ + public static boolean checkSame(final PersistRepository repository, final VersionNodePath versionNodePath, final int currentVersion) { + return checkSame(repository, String.valueOf(currentVersion), versionNodePath.getActiveVersionPath()); + } + + private static boolean checkSame(final PersistRepository repository, final String currentVersion, final String activeVersionPath) { + if (currentVersion.equals(repository.query(activeVersionPath))) { + return true; + } + log.warn("Invalid active version `{}` of key `{}`", currentVersion, activeVersionPath); + return false; + } +} diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/manager/rule/DatabaseRuleItemManager.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/manager/rule/DatabaseRuleItemManager.java index bee609080ca..56c7e4b6971 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/manager/rule/DatabaseRuleItemManager.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/manager/rule/DatabaseRuleItemManager.java @@ -25,11 +25,12 @@ import org.apache.shardingsphere.infra.config.rule.scope.DatabaseRuleConfigurati import org.apache.shardingsphere.infra.config.rule.scope.DatabaseRuleConfigurationEmptyChecker; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.mode.metadata.MetaDataContexts; +import org.apache.shardingsphere.mode.metadata.manager.ActiveVersionChecker; import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistFacade; import org.apache.shardingsphere.mode.node.path.type.metadata.rule.DatabaseRuleNodePath; import org.apache.shardingsphere.mode.node.path.type.version.VersionNodePath; -import org.apache.shardingsphere.mode.spi.rule.RuleItemConfigurationChangedProcessor; import org.apache.shardingsphere.mode.spi.rule.RuleChangedItemType; +import org.apache.shardingsphere.mode.spi.rule.RuleItemConfigurationChangedProcessor; import java.sql.SQLException; @@ -56,7 +57,7 @@ public final class DatabaseRuleItemManager { @SuppressWarnings({"rawtypes", "unchecked"}) public void alter(final DatabaseRuleNodePath databaseRuleNodePath, final int currentVersion) throws SQLException { VersionNodePath versionNodePath = new VersionNodePath(databaseRuleNodePath); - if (!checkActiveVersion(versionNodePath, currentVersion)) { + if (!ActiveVersionChecker.checkSame(metaDataPersistFacade.getRepository(), versionNodePath, currentVersion)) { return; } RuleItemConfigurationChangedProcessor processor = TypedSPILoader.getService(RuleItemConfigurationChangedProcessor.class, @@ -71,14 +72,6 @@ public final class DatabaseRuleItemManager { } } - private boolean checkActiveVersion(final VersionNodePath versionNodePath, final int currentVersion) { - if (String.valueOf(currentVersion).equals(metaDataPersistFacade.getRepository().query(versionNodePath.getActiveVersionPath()))) { - return true; - } - log.warn("Invalid active version `{}` of key `{}`", currentVersion, versionNodePath.getActiveVersionPath()); - return false; - } - /** * Drop rule item. * diff --git a/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageNodeNodePath.java b/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageNodeNodePath.java index 4738f852ab7..8f379351b29 100644 --- a/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageNodeNodePath.java +++ b/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageNodeNodePath.java @@ -46,6 +46,6 @@ public final class StorageNodeNodePath implements NodePath { * @return created search criteria */ public static NodePathSearchCriteria createStorageNodeSearchCriteria() { - return new NodePathSearchCriteria(new StorageNodeNodePath(), false, false, 2); + return new NodePathSearchCriteria(new StorageNodeNodePath(), false, true, 2); } } diff --git a/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageUnitNodePath.java b/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageUnitNodePath.java index 7badcc4714a..b5b1466b441 100644 --- a/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageUnitNodePath.java +++ b/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/metadata/storage/StorageUnitNodePath.java @@ -56,6 +56,6 @@ public final class StorageUnitNodePath implements NodePath { * @return created search criteria */ public static NodePathSearchCriteria createStorageUnitSearchCriteria() { - return new NodePathSearchCriteria(new StorageUnitNodePath(), false, false, 2); + return new NodePathSearchCriteria(new StorageUnitNodePath(), false, true, 2); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/checker/ActiveVersionChecker.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/checker/ActiveVersionChecker.java deleted file mode 100644 index 8df8f227773..00000000000 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/checker/ActiveVersionChecker.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.mode.manager.cluster.dispatch.checker; - -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; -import org.apache.shardingsphere.mode.event.DataChangedEvent; -import org.apache.shardingsphere.mode.manager.ContextManager; - -/** - * Active version checker. - */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class ActiveVersionChecker { - - /** - * Check active version. - * - * @param contextManager context manager - * @param event data changed event - */ - public static void checkActiveVersion(final ContextManager contextManager, final DataChangedEvent event) { - ShardingSpherePreconditions.checkState(event.getValue().equals(contextManager.getPersistServiceFacade().getRepository().query(event.getKey())), - () -> new IllegalStateException(String.format("Invalid active version: %s of key: %s", event.getValue(), event.getKey()))); - } -} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/DataSourceChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/DataSourceChangedHandler.java deleted file mode 100644 index 3bd16b27d12..00000000000 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/DataSourceChangedHandler.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource; - -import org.apache.shardingsphere.mode.event.DataChangedEvent; -import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; -import org.apache.shardingsphere.mode.manager.ContextManager; -import org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.DatabaseChangedHandler; -import org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource.type.StorageNodeChangedHandler; -import org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource.type.StorageUnitChangedHandler; -import org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathSearcher; -import org.apache.shardingsphere.mode.node.path.type.metadata.storage.StorageNodeNodePath; -import org.apache.shardingsphere.mode.node.path.type.metadata.storage.StorageUnitNodePath; -import org.apache.shardingsphere.mode.node.path.type.version.VersionNodePathParser; - -import java.util.Optional; - -/** - * Data source changed handler. - */ -public final class DataSourceChangedHandler implements DatabaseChangedHandler { - - private final StorageUnitChangedHandler storageUnitChangedHandler; - - private final StorageNodeChangedHandler storageNodeChangedHandler; - - public DataSourceChangedHandler(final ContextManager contextManager) { - storageUnitChangedHandler = new StorageUnitChangedHandler(contextManager); - storageNodeChangedHandler = new StorageNodeChangedHandler(contextManager); - } - - @Override - public boolean isSubscribed(final String databaseName, final DataChangedEvent event) { - return NodePathSearcher.isMatchedPath(event.getKey(), StorageUnitNodePath.createDataSourceSearchCriteria(databaseName)); - } - - @Override - public void handle(final String databaseName, final DataChangedEvent event) { - Optional<String> storageUnitName = new VersionNodePathParser(new StorageUnitNodePath()).findIdentifierByActiveVersionPath(event.getKey(), 2); - boolean isActiveVersion = true; - if (!storageUnitName.isPresent()) { - storageUnitName = NodePathSearcher.find(event.getKey(), StorageUnitNodePath.createStorageUnitSearchCriteria()); - isActiveVersion = false; - } - if (storageUnitName.isPresent()) { - handleStorageUnitChanged(databaseName, event, storageUnitName.get(), isActiveVersion); - return; - } - Optional<String> storageNodeName = new VersionNodePathParser(new StorageNodeNodePath()).findIdentifierByActiveVersionPath(event.getKey(), 2); - isActiveVersion = true; - if (!storageNodeName.isPresent()) { - storageNodeName = NodePathSearcher.find(event.getKey(), StorageNodeNodePath.createStorageNodeSearchCriteria()); - isActiveVersion = false; - } - if (storageNodeName.isPresent()) { - handleStorageNodeChanged(databaseName, event, storageNodeName.get(), isActiveVersion); - } - } - - private void handleStorageUnitChanged(final String databaseName, final DataChangedEvent event, final String storageUnitName, final boolean isActiveVersion) { - if (isActiveVersion) { - if (Type.ADDED == event.getType()) { - storageUnitChangedHandler.handleRegistered(databaseName, storageUnitName, event); - } else if (Type.UPDATED == event.getType()) { - storageUnitChangedHandler.handleAltered(databaseName, storageUnitName, event); - } - return; - } - if (Type.DELETED == event.getType()) { - storageUnitChangedHandler.handleUnregistered(databaseName, storageUnitName); - } - } - - private void handleStorageNodeChanged(final String databaseName, final DataChangedEvent event, final String storageNodeName, final boolean isActiveVersion) { - if (isActiveVersion) { - if (Type.ADDED == event.getType()) { - storageNodeChangedHandler.handleRegistered(databaseName, storageNodeName, event); - } else if (Type.UPDATED == event.getType()) { - storageNodeChangedHandler.handleAltered(databaseName, storageNodeName, event); - } - return; - } - if (Type.DELETED == event.getType()) { - storageNodeChangedHandler.handleUnregistered(databaseName, storageNodeName); - } - } -} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageNodeChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageNodeChangedHandler.java new file mode 100644 index 00000000000..ea5d96fe644 --- /dev/null +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageNodeChangedHandler.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource; + +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.DatabaseChangedHandler; +import org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathPattern; +import org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathSearcher; +import org.apache.shardingsphere.mode.node.path.type.metadata.storage.StorageNodeNodePath; +import org.apache.shardingsphere.mode.node.path.type.version.VersionNodePathParser; + +import java.util.Optional; + +/** + * Storage node changed handler. + */ +@RequiredArgsConstructor +public final class StorageNodeChangedHandler implements DatabaseChangedHandler { + + private final ContextManager contextManager; + + @Override + public boolean isSubscribed(final String databaseName, final DataChangedEvent event) { + return new VersionNodePathParser(new StorageNodeNodePath(databaseName, NodePathPattern.IDENTIFIER)).isActiveVersionPath(event.getKey()); + } + + @Override + public void handle(final String databaseName, final DataChangedEvent event) { + Optional<String> storageNodeName = NodePathSearcher.find(event.getKey(), StorageNodeNodePath.createStorageNodeSearchCriteria()); + if (!storageNodeName.isPresent()) { + return; + } + switch (event.getType()) { + case ADDED: + handleRegistered(databaseName, storageNodeName.get(), event); + break; + case UPDATED: + handleAltered(databaseName, storageNodeName.get(), event); + break; + case DELETED: + handleUnregistered(databaseName, storageNodeName.get()); + break; + default: + break; + } + } + + private void handleRegistered(final String databaseName, final String storageNodeName, final DataChangedEvent event) { + // TODO + } + + private void handleAltered(final String databaseName, final String storageNodeName, final DataChangedEvent event) { + // TODO + } + + private void handleUnregistered(final String databaseName, final String storageNodeName) { + // TODO + } +} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageUnitChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageUnitChangedHandler.java new file mode 100644 index 00000000000..f14b02e98fd --- /dev/null +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageUnitChangedHandler.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource; + +import com.google.common.base.Preconditions; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.metadata.manager.ActiveVersionChecker; +import org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.DatabaseChangedHandler; +import org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathPattern; +import org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathSearcher; +import org.apache.shardingsphere.mode.node.path.type.metadata.storage.StorageUnitNodePath; +import org.apache.shardingsphere.mode.node.path.type.version.VersionNodePathParser; + +import java.util.Collections; +import java.util.Optional; + +/** + * Storage unit changed handler. + */ +@RequiredArgsConstructor +public final class StorageUnitChangedHandler implements DatabaseChangedHandler { + + private final ContextManager contextManager; + + @Override + public boolean isSubscribed(final String databaseName, final DataChangedEvent event) { + return new VersionNodePathParser(new StorageUnitNodePath(databaseName, NodePathPattern.IDENTIFIER)).isActiveVersionPath(event.getKey()); + } + + @Override + public void handle(final String databaseName, final DataChangedEvent event) { + Optional<String> storageUnitName = NodePathSearcher.find(event.getKey(), StorageUnitNodePath.createStorageUnitSearchCriteria()); + if (!storageUnitName.isPresent()) { + return; + } + switch (event.getType()) { + case ADDED: + handleRegistered(databaseName, storageUnitName.get(), event); + break; + case UPDATED: + handleAltered(databaseName, storageUnitName.get(), event); + break; + case DELETED: + handleUnregistered(databaseName, storageUnitName.get()); + break; + default: + break; + } + } + + private void handleRegistered(final String databaseName, final String storageUnitName, final DataChangedEvent event) { + if (!ActiveVersionChecker.checkSame(contextManager.getPersistServiceFacade().getRepository(), event)) { + return; + } + DataSourcePoolProperties dataSourcePoolProps = contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDataSourceUnitService().load(databaseName, storageUnitName); + contextManager.getMetaDataContextManager().getStorageUnitManager().register(databaseName, Collections.singletonMap(storageUnitName, dataSourcePoolProps)); + } + + private void handleAltered(final String databaseName, final String storageUnitName, final DataChangedEvent event) { + if (!ActiveVersionChecker.checkSame(contextManager.getPersistServiceFacade().getRepository(), event)) { + return; + } + DataSourcePoolProperties dataSourcePoolProps = contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDataSourceUnitService().load(databaseName, storageUnitName); + contextManager.getMetaDataContextManager().getStorageUnitManager().alter(databaseName, Collections.singletonMap(storageUnitName, dataSourcePoolProps)); + } + + private void handleUnregistered(final String databaseName, final String storageUnitName) { + Preconditions.checkState(contextManager.getMetaDataContexts().getMetaData().containsDatabase(databaseName), "No database '%s' exists.", databaseName); + contextManager.getMetaDataContextManager().getStorageUnitManager().unregister(databaseName, storageUnitName); + } +} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/type/StorageNodeChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/type/StorageNodeChangedHandler.java deleted file mode 100644 index a49d48f882e..00000000000 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/type/StorageNodeChangedHandler.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource.type; - -import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.mode.event.DataChangedEvent; -import org.apache.shardingsphere.mode.manager.ContextManager; - -/** - * Storage node changed handler. - */ -@RequiredArgsConstructor -public final class StorageNodeChangedHandler { - - private final ContextManager contextManager; - - /** - * Handle storage node registered. - * - * @param databaseName database name - * @param dataSourceUnitName data source unit name - * @param event data changed event - */ - public void handleRegistered(final String databaseName, final String dataSourceUnitName, final DataChangedEvent event) { - // TODO - } - - /** - * Handle storage node altered. - * - * @param databaseName database name - * @param dataSourceUnitName data source unit name - * @param event data changed event - */ - public void handleAltered(final String databaseName, final String dataSourceUnitName, final DataChangedEvent event) { - // TODO - } - - /** - * Handle storage node unregistered. - * - * @param databaseName database name - * @param dataSourceUnitName data source unit name - */ - public void handleUnregistered(final String databaseName, final String dataSourceUnitName) { - // TODO - } -} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/type/StorageUnitChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/type/StorageUnitChangedHandler.java deleted file mode 100644 index 7c48981db1d..00000000000 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/type/StorageUnitChangedHandler.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource.type; - -import com.google.common.base.Preconditions; -import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; -import org.apache.shardingsphere.mode.event.DataChangedEvent; -import org.apache.shardingsphere.mode.manager.ContextManager; -import org.apache.shardingsphere.mode.manager.cluster.dispatch.checker.ActiveVersionChecker; - -import java.util.Collections; - -/** - * Storage unit changed handler. - */ -@RequiredArgsConstructor -public final class StorageUnitChangedHandler { - - private final ContextManager contextManager; - - /** - * Handle storage unit registered. - * - * @param databaseName database name - * @param dataSourceUnitName data source unit name - * @param event data changed event - */ - public void handleRegistered(final String databaseName, final String dataSourceUnitName, final DataChangedEvent event) { - ActiveVersionChecker.checkActiveVersion(contextManager, event); - DataSourcePoolProperties dataSourcePoolProps = contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDataSourceUnitService().load(databaseName, dataSourceUnitName); - contextManager.getMetaDataContextManager().getStorageUnitManager().register(databaseName, Collections.singletonMap(dataSourceUnitName, dataSourcePoolProps)); - } - - /** - * Handle storage unit altered. - * - * @param databaseName database name - * @param dataSourceUnitName data source unit name - * @param event data changed event - */ - public void handleAltered(final String databaseName, final String dataSourceUnitName, final DataChangedEvent event) { - ActiveVersionChecker.checkActiveVersion(contextManager, event); - DataSourcePoolProperties dataSourcePoolProps = contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDataSourceUnitService().load(databaseName, dataSourceUnitName); - contextManager.getMetaDataContextManager().getStorageUnitManager().alter(databaseName, Collections.singletonMap(dataSourceUnitName, dataSourcePoolProps)); - } - - /** - * Handle storage unit unregistered. - * - * @param databaseName database name - * @param dataSourceUnitName data source unit name - */ - public void handleUnregistered(final String databaseName, final String dataSourceUnitName) { - Preconditions.checkState(contextManager.getMetaDataContexts().getMetaData().containsDatabase(databaseName), "No database '%s' exists.", databaseName); - contextManager.getMetaDataContextManager().getStorageUnitManager().unregister(databaseName, dataSourceUnitName); - } -} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/TableChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/TableChangedHandler.java index e1ea166f335..655558fcc33 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/TableChangedHandler.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/TableChangedHandler.java @@ -20,7 +20,7 @@ package org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.manager.ContextManager; -import org.apache.shardingsphere.mode.manager.cluster.dispatch.checker.ActiveVersionChecker; +import org.apache.shardingsphere.mode.metadata.manager.ActiveVersionChecker; import org.apache.shardingsphere.mode.metadata.refresher.statistics.StatisticsRefreshEngine; import org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathSearcher; import org.apache.shardingsphere.mode.node.path.type.metadata.database.TableMetadataNodePath; @@ -50,7 +50,9 @@ public final class TableChangedHandler { public void handleCreatedOrAltered(final String databaseName, final String schemaName, final DataChangedEvent event) { String tableName = new VersionNodePathParser(new TableMetadataNodePath()) .findIdentifierByActiveVersionPath(event.getKey(), 3).orElseThrow(() -> new IllegalStateException("Table name not found.")); - ActiveVersionChecker.checkActiveVersion(contextManager, event); + if (!ActiveVersionChecker.checkSame(contextManager.getPersistServiceFacade().getRepository(), event)) { + return; + } ShardingSphereTable table = contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDatabaseMetaDataFacade().getTable().load(databaseName, schemaName, tableName); contextManager.getMetaDataContextManager().getDatabaseMetaDataManager().alterTable(databaseName, schemaName, table); statisticsRefreshEngine.asyncRefresh(); diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/ViewChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/ViewChangedHandler.java index a4d57906a82..2f01727d455 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/ViewChangedHandler.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/metadata/type/ViewChangedHandler.java @@ -20,7 +20,7 @@ package org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.manager.ContextManager; -import org.apache.shardingsphere.mode.manager.cluster.dispatch.checker.ActiveVersionChecker; +import org.apache.shardingsphere.mode.metadata.manager.ActiveVersionChecker; import org.apache.shardingsphere.mode.metadata.refresher.statistics.StatisticsRefreshEngine; import org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathSearcher; import org.apache.shardingsphere.mode.node.path.type.metadata.database.ViewMetadataNodePath; @@ -50,7 +50,9 @@ public final class ViewChangedHandler { public void handleCreatedOrAltered(final String databaseName, final String schemaName, final DataChangedEvent event) { String viewName = new VersionNodePathParser(new ViewMetadataNodePath()).findIdentifierByActiveVersionPath(event.getKey(), 3) .orElseThrow(() -> new IllegalStateException("View name not found.")); - ActiveVersionChecker.checkActiveVersion(contextManager, event); + if (!ActiveVersionChecker.checkSame(contextManager.getPersistServiceFacade().getRepository(), event)) { + return; + } ShardingSphereView view = contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDatabaseMetaDataFacade().getView().load(databaseName, schemaName, viewName); contextManager.getMetaDataContextManager().getDatabaseMetaDataManager().alterView(databaseName, schemaName, view); statisticsRefreshEngine.asyncRefresh(); diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/GlobalRuleChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/GlobalRuleChangedHandler.java index c5da0c53035..8ae66290624 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/GlobalRuleChangedHandler.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/GlobalRuleChangedHandler.java @@ -21,7 +21,7 @@ import org.apache.shardingsphere.infra.config.rule.RuleConfiguration; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.manager.ContextManager; -import org.apache.shardingsphere.mode.manager.cluster.dispatch.checker.ActiveVersionChecker; +import org.apache.shardingsphere.mode.metadata.manager.ActiveVersionChecker; import org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.global.GlobalDataChangedEventHandler; import org.apache.shardingsphere.mode.node.path.engine.generator.NodePathGenerator; import org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathPattern; @@ -53,7 +53,9 @@ public final class GlobalRuleChangedHandler implements GlobalDataChangedEventHan if (!ruleType.isPresent()) { return; } - ActiveVersionChecker.checkActiveVersion(contextManager, event); + if (!ActiveVersionChecker.checkSame(contextManager.getPersistServiceFacade().getRepository(), event)) { + return; + } RuleConfiguration ruleConfig = contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getGlobalRuleService().load(ruleType.get()); contextManager.getMetaDataContextManager().getGlobalConfigurationManager().alterGlobalRuleConfiguration(ruleConfig); } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/PropertiesChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/PropertiesChangedHandler.java index 36b412dd716..3d714800ac0 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/PropertiesChangedHandler.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/type/PropertiesChangedHandler.java @@ -20,7 +20,7 @@ package org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.global.t import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.manager.ContextManager; -import org.apache.shardingsphere.mode.manager.cluster.dispatch.checker.ActiveVersionChecker; +import org.apache.shardingsphere.mode.metadata.manager.ActiveVersionChecker; import org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.global.GlobalDataChangedEventHandler; import org.apache.shardingsphere.mode.node.path.engine.generator.NodePathGenerator; import org.apache.shardingsphere.mode.node.path.type.global.GlobalPropertiesNodePath; @@ -49,7 +49,9 @@ public final class PropertiesChangedHandler implements GlobalDataChangedEventHan if (!new VersionNodePathParser(new GlobalPropertiesNodePath()).isActiveVersionPath(event.getKey())) { return; } - ActiveVersionChecker.checkActiveVersion(contextManager, event); + if (!ActiveVersionChecker.checkSame(contextManager.getPersistServiceFacade().getRepository(), event)) { + return; + } contextManager.getMetaDataContextManager().getGlobalConfigurationManager().alterProperties(contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getPropsService().load()); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/listener/type/DatabaseMetaDataChangedListener.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/listener/type/DatabaseMetaDataChangedListener.java index 6b937dfece0..24eda761ee6 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/listener/type/DatabaseMetaDataChangedListener.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/listener/type/DatabaseMetaDataChangedListener.java @@ -22,7 +22,8 @@ import org.apache.shardingsphere.infra.spi.type.ordered.cache.OrderedServicesCac import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.DatabaseChangedHandler; -import org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource.DataSourceChangedHandler; +import org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource.StorageNodeChangedHandler; +import org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource.StorageUnitChangedHandler; import org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.metadata.MetaDataChangedHandler; import org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.rule.RuleConfigurationChangedHandler; import org.apache.shardingsphere.mode.node.path.engine.searcher.NodePathSearcher; @@ -42,7 +43,11 @@ public final class DatabaseMetaDataChangedListener implements DataChangedEventLi private final Collection<DatabaseChangedHandler> handlers; public DatabaseMetaDataChangedListener(final ContextManager contextManager) { - handlers = Arrays.asList(new MetaDataChangedHandler(contextManager), new DataSourceChangedHandler(contextManager), new RuleConfigurationChangedHandler(contextManager)); + handlers = Arrays.asList( + new MetaDataChangedHandler(contextManager), + new StorageUnitChangedHandler(contextManager), + new StorageNodeChangedHandler(contextManager), + new RuleConfigurationChangedHandler(contextManager)); } @Override diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/DataSourceChangedHandlerTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageUnitChangedHandlerTest.java similarity index 82% rename from mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/DataSourceChangedHandlerTest.java rename to mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageUnitChangedHandlerTest.java index 2b489577685..fce68712323 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/DataSourceChangedHandlerTest.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/database/datasource/StorageUnitChangedHandlerTest.java @@ -17,7 +17,6 @@ package org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.database.datasource; -import org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey; import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; @@ -39,24 +38,21 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) -class DataSourceChangedHandlerTest { +class StorageUnitChangedHandlerTest { - private DataSourceChangedHandler handler; + private StorageUnitChangedHandler handler; @Mock(answer = Answers.RETURNS_DEEP_STUBS) private ContextManager contextManager; @BeforeEach void setUp() { - when(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps().getValue(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED)).thenReturn(false); - when(contextManager.getComputeNodeInstanceContext().getModeConfiguration().isCluster()).thenReturn(true); when(contextManager.getPersistServiceFacade().getRepository().query(any())).thenReturn("0"); - handler = new DataSourceChangedHandler(contextManager); + handler = new StorageUnitChangedHandler(contextManager); } @Test void assertHandleStorageUnitRegistered() { - when(contextManager.getPersistServiceFacade().getRepository().query("key")).thenReturn("value"); when(contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDataSourceUnitService().load("foo_db", "foo_unit")).thenReturn(mock(DataSourcePoolProperties.class)); handler.handle("foo_db", new DataChangedEvent("/metadata/foo_db/data_sources/units/foo_unit/active_version", "0", Type.ADDED)); verify(contextManager.getMetaDataContextManager().getStorageUnitManager()).register(eq("foo_db"), any()); @@ -64,7 +60,6 @@ class DataSourceChangedHandlerTest { @Test void assertHandleStorageUnitAltered() { - when(contextManager.getPersistServiceFacade().getRepository().query("key")).thenReturn("value"); when(contextManager.getPersistServiceFacade().getMetaDataPersistFacade().getDataSourceUnitService().load("foo_db", "foo_unit")).thenReturn(mock(DataSourcePoolProperties.class)); handler.handle("foo_db", new DataChangedEvent("/metadata/foo_db/data_sources/units/foo_unit/active_version", "0", Type.UPDATED)); verify(contextManager.getMetaDataContextManager().getStorageUnitManager()).alter(eq("foo_db"), any());