This is an automated email from the ASF dual-hosted git repository. duanzhengqiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 2c998a12772 Refactor ClusterMetaDataManagerPersistService (#34570) 2c998a12772 is described below commit 2c998a1277225241357266e64e95698438c11db0 Author: Haoran Meng <menghaora...@gmail.com> AuthorDate: Fri Feb 7 10:32:56 2025 +0800 Refactor ClusterMetaDataManagerPersistService (#34570) * Refactor ClusterMetaDataManagerPersistService * Refactor ClusterMetaDataManagerPersistService --- .../ClusterMetaDataManagerPersistService.java | 90 ++++++++-------------- 1 file changed, 33 insertions(+), 57 deletions(-) diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterMetaDataManagerPersistService.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterMetaDataManagerPersistService.java index 5ed3375d85a..7e7a398861d 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterMetaDataManagerPersistService.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterMetaDataManagerPersistService.java @@ -19,10 +19,8 @@ package org.apache.shardingsphere.mode.manager.cluster.persist.service; import lombok.SneakyThrows; import org.apache.shardingsphere.infra.config.rule.RuleConfiguration; -import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer; import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; -import org.apache.shardingsphere.infra.metadata.database.resource.node.StorageNode; import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData; import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema; import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable; @@ -30,10 +28,8 @@ import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSp import org.apache.shardingsphere.infra.metadata.version.MetaDataVersion; import org.apache.shardingsphere.mode.manager.cluster.persist.coordinator.database.ClusterDatabaseListenerCoordinatorType; import org.apache.shardingsphere.mode.manager.cluster.persist.coordinator.database.ClusterDatabaseListenerPersistCoordinator; -import org.apache.shardingsphere.mode.metadata.manager.MetaDataContextManager; import org.apache.shardingsphere.mode.metadata.MetaDataContexts; -import org.apache.shardingsphere.mode.metadata.factory.MetaDataContextsFactory; -import org.apache.shardingsphere.mode.metadata.manager.resource.SwitchingResource; +import org.apache.shardingsphere.mode.metadata.manager.MetaDataContextManager; import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistFacade; import org.apache.shardingsphere.mode.metadata.persist.config.database.DataSourceUnitPersistService; import org.apache.shardingsphere.mode.metadata.persist.metadata.DatabaseMetaDataPersistFacade; @@ -42,11 +38,9 @@ import org.apache.shardingsphere.mode.spi.repository.PersistRepository; import org.apache.shardingsphere.single.config.SingleRuleConfiguration; import org.apache.shardingsphere.single.rule.SingleRule; -import javax.sql.DataSource; import java.sql.SQLException; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -131,60 +125,25 @@ public final class ClusterMetaDataManagerPersistService implements MetaDataManag @Override public void registerStorageUnits(final String databaseName, final Map<String, DataSourcePoolProperties> toBeRegisteredProps) throws SQLException { MetaDataContexts originalMetaDataContexts = new MetaDataContexts(metaDataContextManager.getMetaDataContexts().getMetaData(), metaDataContextManager.getMetaDataContexts().getStatistics()); - Map<StorageNode, DataSource> newDataSources = new HashMap<>(toBeRegisteredProps.size()); - try { - SwitchingResource switchingResource = metaDataContextManager.getResourceSwitchManager() - .switchByRegisterStorageUnit(originalMetaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), toBeRegisteredProps); - newDataSources.putAll(switchingResource.getNewDataSources()); - MetaDataContexts reloadMetaDataContexts = new MetaDataContextsFactory(metaDataPersistFacade, metaDataContextManager.getComputeNodeInstanceContext()).createBySwitchResource( - databaseName, false, switchingResource, originalMetaDataContexts); - metaDataPersistFacade.getDataSourceUnitService().persist(databaseName, toBeRegisteredProps); - afterStorageUnitsAltered(databaseName, originalMetaDataContexts, reloadMetaDataContexts); - reloadMetaDataContexts.getMetaData().close(); - } finally { - closeNewDataSources(newDataSources); - } + metaDataPersistFacade.getDataSourceUnitService().persist(databaseName, toBeRegisteredProps); + afterStorageUnitsAltered(databaseName, originalMetaDataContexts); } @Override public void alterStorageUnits(final ShardingSphereDatabase database, final Map<String, DataSourcePoolProperties> toBeUpdatedProps) throws SQLException { MetaDataContexts originalMetaDataContexts = new MetaDataContexts(metaDataContextManager.getMetaDataContexts().getMetaData(), metaDataContextManager.getMetaDataContexts().getStatistics()); - Map<StorageNode, DataSource> newDataSources = new HashMap<>(toBeUpdatedProps.size()); - try { - SwitchingResource switchingResource = metaDataContextManager.getResourceSwitchManager() - .switchByAlterStorageUnit(originalMetaDataContexts.getMetaData().getDatabase(database.getName()).getResourceMetaData(), toBeUpdatedProps); - newDataSources.putAll(switchingResource.getNewDataSources()); - MetaDataContexts reloadMetaDataContexts = new MetaDataContextsFactory(metaDataPersistFacade, metaDataContextManager.getComputeNodeInstanceContext()).createBySwitchResource( - database.getName(), false, switchingResource, originalMetaDataContexts); - DataSourceUnitPersistService dataSourceService = metaDataPersistFacade.getDataSourceUnitService(); - metaDataPersistFacade.getMetaDataVersionService() - .switchActiveVersion(dataSourceService.persist(database.getName(), toBeUpdatedProps)); - afterStorageUnitsAltered(database.getName(), originalMetaDataContexts, reloadMetaDataContexts); - reloadMetaDataContexts.getMetaData().close(); - } finally { - closeNewDataSources(newDataSources); - } + DataSourceUnitPersistService dataSourceService = metaDataPersistFacade.getDataSourceUnitService(); + metaDataPersistFacade.getMetaDataVersionService() + .switchActiveVersion(dataSourceService.persist(database.getName(), toBeUpdatedProps)); + afterStorageUnitsAltered(database.getName(), originalMetaDataContexts); } @Override public void unregisterStorageUnits(final ShardingSphereDatabase database, final Collection<String> toBeDroppedStorageUnitNames) throws SQLException { for (String each : getToBeDroppedResourceNames(database.getName(), toBeDroppedStorageUnitNames)) { MetaDataContexts originalMetaDataContexts = new MetaDataContexts(metaDataContextManager.getMetaDataContexts().getMetaData(), metaDataContextManager.getMetaDataContexts().getStatistics()); - SwitchingResource switchingResource = metaDataContextManager.getResourceSwitchManager() - .createByUnregisterStorageUnit(originalMetaDataContexts.getMetaData().getDatabase(database.getName()).getResourceMetaData(), Collections.singletonList(each)); - MetaDataContexts reloadMetaDataContexts = new MetaDataContextsFactory(metaDataPersistFacade, metaDataContextManager.getComputeNodeInstanceContext()).createBySwitchResource( - database.getName(), false, switchingResource, originalMetaDataContexts); metaDataPersistFacade.getDataSourceUnitService().delete(database.getName(), each); - afterStorageUnitsDropped(database.getName(), originalMetaDataContexts, reloadMetaDataContexts); - reloadMetaDataContexts.getMetaData().close(); - } - } - - private void closeNewDataSources(final Map<StorageNode, DataSource> newDataSources) { - for (Map.Entry<StorageNode, DataSource> entry : newDataSources.entrySet()) { - if (null != entry.getValue()) { - new DataSourcePoolDestroyer(entry.getValue()).asyncDestroy(); - } + afterStorageUnitsDropped(database.getName(), originalMetaDataContexts); } } @@ -193,7 +152,8 @@ public final class ClusterMetaDataManagerPersistService implements MetaDataManag return toBeDroppedResourceNames.stream().filter(propsMap::containsKey).collect(Collectors.toList()); } - private void afterStorageUnitsAltered(final String databaseName, final MetaDataContexts originalMetaDataContexts, final MetaDataContexts reloadMetaDataContexts) { + private void afterStorageUnitsAltered(final String databaseName, final MetaDataContexts originalMetaDataContexts) { + MetaDataContexts reloadMetaDataContexts = getReloadMetaDataContexts(originalMetaDataContexts); Optional.ofNullable(reloadMetaDataContexts.getStatistics().getDatabaseStatistics(databaseName)) .ifPresent(optional -> optional.getSchemaStatisticsMap().forEach((schemaName, schemaStatistics) -> metaDataPersistFacade.getStatisticsService() .persist(originalMetaDataContexts.getMetaData().getDatabase(databaseName), schemaName, schemaStatistics))); @@ -201,7 +161,8 @@ public final class ClusterMetaDataManagerPersistService implements MetaDataManag originalMetaDataContexts.getMetaData().getDatabase(databaseName)); } - private void afterStorageUnitsDropped(final String databaseName, final MetaDataContexts originalMetaDataContexts, final MetaDataContexts reloadMetaDataContexts) { + private void afterStorageUnitsDropped(final String databaseName, final MetaDataContexts originalMetaDataContexts) { + MetaDataContexts reloadMetaDataContexts = getReloadMetaDataContexts(originalMetaDataContexts); reloadMetaDataContexts.getMetaData().getDatabase(databaseName).getAllSchemas().forEach(each -> metaDataPersistFacade.getDatabaseMetaDataFacade() .getSchema().alterByRuleDropped(reloadMetaDataContexts.getMetaData().getDatabase(databaseName).getName(), each)); Optional.ofNullable(reloadMetaDataContexts.getStatistics().getDatabaseStatistics(databaseName)) @@ -229,10 +190,8 @@ public final class ClusterMetaDataManagerPersistService implements MetaDataManag afterRuleConfigurationAltered(database.getName(), originalMetaDataContexts); } - @SneakyThrows(InterruptedException.class) private void afterRuleConfigurationAltered(final String databaseName, final MetaDataContexts originalMetaDataContexts) { - Thread.sleep(3000L); - MetaDataContexts reloadMetaDataContexts = metaDataContextManager.getMetaDataContexts(); + MetaDataContexts reloadMetaDataContexts = getReloadMetaDataContexts(originalMetaDataContexts); metaDataPersistFacade.persistReloadDatabaseByAlter( databaseName, reloadMetaDataContexts.getMetaData().getDatabase(databaseName), originalMetaDataContexts.getMetaData().getDatabase(databaseName)); } @@ -251,10 +210,8 @@ public final class ClusterMetaDataManagerPersistService implements MetaDataManag afterRuleConfigurationDropped(database.getName(), originalMetaDataContexts); } - @SneakyThrows(InterruptedException.class) private void afterRuleConfigurationDropped(final String databaseName, final MetaDataContexts originalMetaDataContexts) { - Thread.sleep(3000L); - MetaDataContexts reloadMetaDataContexts = metaDataContextManager.getMetaDataContexts(); + MetaDataContexts reloadMetaDataContexts = getReloadMetaDataContexts(originalMetaDataContexts); metaDataPersistFacade.persistReloadDatabaseByDrop( databaseName, reloadMetaDataContexts.getMetaData().getDatabase(databaseName), originalMetaDataContexts.getMetaData().getDatabase(databaseName)); } @@ -268,4 +225,23 @@ public final class ClusterMetaDataManagerPersistService implements MetaDataManag public void alterProperties(final Properties props) { metaDataPersistFacade.getPropsService().persist(props); } + + @SneakyThrows(InterruptedException.class) + private MetaDataContexts getReloadMetaDataContexts(final MetaDataContexts originalMetaDataContexts) { + Thread.sleep(3000L); + MetaDataContexts reloadMetaDataContexts = metaDataContextManager.getMetaDataContexts(); + if (reloadMetaDataContexts != originalMetaDataContexts) { + return reloadMetaDataContexts; + } + long startTime = System.currentTimeMillis(); + long timeout = 30000; + while (System.currentTimeMillis() - startTime < timeout) { + reloadMetaDataContexts = metaDataContextManager.getMetaDataContexts(); + if (reloadMetaDataContexts != originalMetaDataContexts) { + return reloadMetaDataContexts; + } + Thread.sleep(1000L); + } + throw new RuntimeException("Failed to detect metadata update within 30 seconds"); + } }