This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4ee9c4154d7 Extract PipelineJobPreparerUtils from
RuleAlteredJobPreparer for common usage (#20051)
4ee9c4154d7 is described below
commit 4ee9c4154d7379f1dd37849a326b5f1d8a15e79a
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Aug 10 17:29:16 2022 +0800
Extract PipelineJobPreparerUtils from RuleAlteredJobPreparer for common
usage (#20051)
---
.../core/util/PipelineJobPreparerUtils.java | 167 +++++++++++++++++++++
.../rulealtered/RuleAlteredJobPreparer.java | 86 ++---------
.../pipeline/framework/watcher/ScalingWatcher.java | 3 +
3 files changed, 184 insertions(+), 72 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
new file mode 100644
index 00000000000..25e0c10471e
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
+import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
+import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparerFactory;
+import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
+import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
+import
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
+import
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceCheckerFactory;
+import
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * Pipeline job preparer utils.
+ */
+@Slf4j
+public final class PipelineJobPreparerUtils {
+
+ /**
+ * Prepare target schema.
+ *
+ * @param databaseType database type
+ * @param prepareTargetSchemasParameter prepare target schemas parameter
+ */
+ public static void prepareTargetSchema(final String databaseType, final
PrepareTargetSchemasParameter prepareTargetSchemasParameter) {
+ Optional<DataSourcePreparer> dataSourcePreparer =
DataSourcePreparerFactory.getInstance(databaseType);
+ if (!dataSourcePreparer.isPresent()) {
+ log.info("dataSourcePreparer null, ignore prepare target");
+ return;
+ }
+
dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
+ }
+
+ /**
+ * Prepare target tables.
+ *
+ * @param databaseType database type
+ * @param prepareTargetTablesParameter prepare target tables parameter
+ */
+ public static void prepareTargetTables(final String databaseType, final
PrepareTargetTablesParameter prepareTargetTablesParameter) {
+ Optional<DataSourcePreparer> dataSourcePreparer =
DataSourcePreparerFactory.getInstance(databaseType);
+ if (!dataSourcePreparer.isPresent()) {
+ log.info("dataSourcePreparer null, ignore prepare target");
+ return;
+ }
+
dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParameter);
+ }
+
+ /**
+ * Get incremental position.
+ *
+ * @param initIncremental init incremental
+ * @param dumperConfig dumper config
+ * @param dataSourceManager data source manager
+ * @return ingest position
+ * @throws SQLException sql exception
+ */
+ public static IngestPosition<?> getIncrementalPosition(final
JobItemIncrementalTasksProgress initIncremental, final DumperConfiguration
dumperConfig,
+ final
PipelineDataSourceManager dataSourceManager) throws SQLException {
+ if (null != initIncremental) {
+ Optional<IngestPosition<?>> position =
initIncremental.getIncrementalPosition(dumperConfig.getDataSourceName());
+ if (position.isPresent()) {
+ return position.get();
+ }
+ }
+ String databaseType =
dumperConfig.getDataSourceConfig().getDatabaseType().getType();
+ DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
+ return
PositionInitializerFactory.getInstance(databaseType).init(dataSource);
+ }
+
+ /**
+ * Check data source.
+ *
+ * @param databaseType database type
+ * @param dataSources data source
+ */
+ public static void checkSourceDataSource(final String databaseType, final
Collection<? extends DataSource> dataSources) {
+ if (null == dataSources || dataSources.isEmpty()) {
+ log.info("source data source is empty, skip check");
+ return;
+ }
+ DataSourceChecker dataSourceChecker =
DataSourceCheckerFactory.getInstance(databaseType);
+ dataSourceChecker.checkConnection(dataSources);
+ dataSourceChecker.checkPrivilege(dataSources);
+ dataSourceChecker.checkVariable(dataSources);
+ }
+
+ /**
+ * Check target data source.
+ *
+ * @param databaseType database type
+ * @param importerConfig importer config
+ * @param targetDataSources target data sources
+ */
+ public static void checkTargetDataSource(final String databaseType, final
ImporterConfiguration importerConfig, final Collection<? extends DataSource>
targetDataSources) {
+ DataSourceChecker dataSourceChecker =
DataSourceCheckerFactory.getInstance(databaseType);
+ if (null == targetDataSources || targetDataSources.isEmpty()) {
+ log.info("target data source is empty, skip check");
+ return;
+ }
+ dataSourceChecker.checkConnection(targetDataSources);
+ dataSourceChecker.checkTargetTable(targetDataSources,
importerConfig.getTableNameSchemaNameMapping(),
importerConfig.getLogicTableNames());
+ }
+
+
+ /**
+ * Cleanup job preparer.
+ *
+ * @param pipelineDataSourceConfig pipeline data source config
+ * @throws SQLException sql exception
+ */
+ public static void destroyPosition(final PipelineDataSourceConfiguration
pipelineDataSourceConfig) throws SQLException {
+ DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType();
+ PositionInitializer positionInitializer =
PositionInitializerFactory.getInstance(databaseType.getType());
+ log.info("Cleanup database type:{}, data source type:{}",
databaseType.getType(), pipelineDataSourceConfig.getType());
+ if (pipelineDataSourceConfig instanceof
ShardingSpherePipelineDataSourceConfiguration) {
+ ShardingSpherePipelineDataSourceConfiguration dataSourceConfig =
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig;
+ for (DataSourceProperties each : new
YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(dataSourceConfig.getRootConfig()).values())
{
+ try (PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
+ positionInitializer.destroy(dataSource);
+ }
+ }
+ }
+ if (pipelineDataSourceConfig instanceof
StandardPipelineDataSourceConfiguration) {
+ StandardPipelineDataSourceConfiguration dataSourceConfig =
(StandardPipelineDataSourceConfiguration) pipelineDataSourceConfig;
+ try (PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(
+ DataSourcePoolCreator.create((DataSourceProperties)
dataSourceConfig.getDataSourceConfiguration()), databaseType)) {
+ positionInitializer.destroy(dataSource);
+ }
+ }
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 8f16493af12..b9acabdbcca 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -19,48 +19,34 @@ package
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
-import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparerFactory;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineJobPreparerUtils;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare.InventoryTaskSplitter;
-import
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
-import
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceCheckerFactory;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
-import
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
-import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
-import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
-import javax.sql.DataSource;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.Collections;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
@@ -75,7 +61,7 @@ public final class RuleAlteredJobPreparer {
* @param jobContext job context
*/
public void prepare(final RuleAlteredJobContext jobContext) {
- checkSourceDataSource(jobContext);
+
PipelineJobPreparerUtils.checkSourceDataSource(jobContext.getJobConfig().getSourceDatabaseType(),
Collections.singleton(jobContext.getSourceDataSource()));
if (jobContext.isStopping()) {
throw new PipelineIgnoredException("Job stopping, jobId=" +
jobContext.getJobId());
}
@@ -133,27 +119,24 @@ public final class RuleAlteredJobPreparer {
JobProgress initProgress = jobContext.getInitProgress();
if (null == initProgress || initProgress.getStatus() ==
JobStatus.PREPARING_FAILURE) {
PipelineDataSourceWrapper targetDataSource =
jobContext.getDataSourceManager().getDataSource(jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
- checkTargetDataSource(jobContext, targetDataSource);
+
PipelineJobPreparerUtils.checkTargetDataSource(jobContext.getJobConfig().getTargetDatabaseType(),
jobContext.getTaskConfig().getImporterConfig(),
+ Collections.singletonList(targetDataSource));
}
}
private void prepareTarget(final RuleAlteredJobContext jobContext) {
RuleAlteredJobConfiguration jobConfig = jobContext.getJobConfig();
- Optional<DataSourcePreparer> dataSourcePreparer =
DataSourcePreparerFactory.getInstance(jobConfig.getTargetDatabaseType());
- if (!dataSourcePreparer.isPresent()) {
- log.info("dataSourcePreparer null, ignore prepare target");
- return;
- }
TableNameSchemaNameMapping tableNameSchemaNameMapping =
jobContext.getTaskConfig().getDumperConfig().getTableNameSchemaNameMapping();
- PrepareTargetSchemasParameter prepareTargetSchemasParameter = new
PrepareTargetSchemasParameter(jobConfig.splitLogicTableNames(),
-
DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()),
- jobConfig.getDatabaseName(),
jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
jobContext.getDataSourceManager(), tableNameSchemaNameMapping);
+ String targetDatabaseType = jobConfig.getTargetDatabaseType();
if (isSourceAndTargetSchemaAvailable(jobConfig)) {
-
dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
+ PrepareTargetSchemasParameter prepareTargetSchemasParameter = new
PrepareTargetSchemasParameter(jobConfig.splitLogicTableNames(),
+ DatabaseTypeFactory.getInstance(targetDatabaseType),
+ jobConfig.getDatabaseName(),
jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
jobContext.getDataSourceManager(), tableNameSchemaNameMapping);
+ PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType,
prepareTargetSchemasParameter);
}
PrepareTargetTablesParameter prepareTargetTablesParameter = new
PrepareTargetTablesParameter(jobConfig.getDatabaseName(),
jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
jobContext.getDataSourceManager(),
jobConfig.getTablesFirstDataNodes(), tableNameSchemaNameMapping);
-
dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParameter);
+ PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType,
prepareTargetTablesParameter);
}
private boolean isSourceAndTargetSchemaAvailable(final
RuleAlteredJobConfiguration jobConfig) {
@@ -166,22 +149,6 @@ public final class RuleAlteredJobPreparer {
return true;
}
- private void checkSourceDataSource(final RuleAlteredJobContext jobContext)
{
- DataSourceChecker dataSourceChecker =
DataSourceCheckerFactory.getInstance(jobContext.getJobConfig().getSourceDatabaseType());
- Collection<PipelineDataSourceWrapper> sourceDataSources =
Collections.singleton(jobContext.getSourceDataSource());
- dataSourceChecker.checkConnection(sourceDataSources);
- dataSourceChecker.checkPrivilege(sourceDataSources);
- dataSourceChecker.checkVariable(sourceDataSources);
- }
-
- private void checkTargetDataSource(final RuleAlteredJobContext jobContext,
final PipelineDataSourceWrapper targetDataSource) {
- DataSourceChecker dataSourceChecker =
DataSourceCheckerFactory.getInstance(jobContext.getJobConfig().getTargetDatabaseType());
- Collection<PipelineDataSourceWrapper> targetDataSources =
Collections.singletonList(targetDataSource);
- dataSourceChecker.checkConnection(targetDataSources);
- ImporterConfiguration importerConfig =
jobContext.getTaskConfig().getImporterConfig();
- dataSourceChecker.checkTargetTable(targetDataSources,
importerConfig.getTableNameSchemaNameMapping(),
importerConfig.getLogicTableNames());
- }
-
private void initInventoryTasks(final RuleAlteredJobContext jobContext) {
InventoryTaskSplitter inventoryTaskSplitter = new
InventoryTaskSplitter(jobContext.getSourceMetaDataLoader(),
jobContext.getDataSourceManager(),
jobContext.getJobProcessContext().getImporterExecuteEngine(),
jobContext.getSourceDataSource(), jobContext.getTaskConfig(),
jobContext.getInitProgress());
@@ -193,7 +160,8 @@ public final class RuleAlteredJobPreparer {
ExecuteEngine incrementalDumperExecuteEngine =
jobContext.getJobProcessContext().getIncrementalDumperExecuteEngine();
TaskConfiguration taskConfig = jobContext.getTaskConfig();
PipelineDataSourceManager dataSourceManager =
jobContext.getDataSourceManager();
-
taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext,
taskConfig, dataSourceManager));
+ JobItemIncrementalTasksProgress incrementalTasksProgress =
jobContext.getInitProgress() == null ? null :
jobContext.getInitProgress().getIncremental();
+
taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(incrementalTasksProgress,
taskConfig.getDumperConfig(), dataSourceManager));
PipelineTableMetaDataLoader sourceMetaDataLoader =
jobContext.getSourceMetaDataLoader();
DefaultPipelineJobProgressListener jobProgressListener = new
DefaultPipelineJobProgressListener(jobContext.getJobId(),
jobContext.getShardingItem());
IncrementalTask incrementalTask = new
IncrementalTask(taskConfig.getImporterConfig().getConcurrency(),
@@ -201,20 +169,6 @@ public final class RuleAlteredJobPreparer {
jobContext.getIncrementalTasks().add(incrementalTask);
}
- private IngestPosition<?> getIncrementalPosition(final
RuleAlteredJobContext jobContext, final TaskConfiguration taskConfig,
- final
PipelineDataSourceManager dataSourceManager) throws SQLException {
- JobProgress initProgress = jobContext.getInitProgress();
- if (null != initProgress) {
- Optional<IngestPosition<?>> position =
initProgress.getIncremental().getIncrementalPosition(taskConfig.getDumperConfig().getDataSourceName());
- if (position.isPresent()) {
- return position.get();
- }
- }
- String databaseType =
taskConfig.getDumperConfig().getDataSourceConfig().getDatabaseType().getType();
- DataSource dataSource =
dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig());
- return
PositionInitializerFactory.getInstance(databaseType).init(dataSource);
- }
-
/**
* Do cleanup work for scaling job.
*
@@ -222,21 +176,9 @@ public final class RuleAlteredJobPreparer {
*/
public void cleanup(final RuleAlteredJobConfiguration jobConfig) {
try {
- cleanup0(jobConfig);
+ PipelineJobPreparerUtils.destroyPosition(jobConfig.getSource());
} catch (final SQLException ex) {
log.warn("Scaling job destroying failed", ex);
}
}
-
- private void cleanup0(final RuleAlteredJobConfiguration jobConfig) throws
SQLException {
- DatabaseType databaseType =
DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType());
- PositionInitializer positionInitializer =
PositionInitializerFactory.getInstance(databaseType.getType());
- ShardingSpherePipelineDataSourceConfiguration sourceDataSourceConfig =
(ShardingSpherePipelineDataSourceConfiguration)
PipelineDataSourceConfigurationFactory
- .newInstance(jobConfig.getSource().getType(),
jobConfig.getSource().getParameter());
- for (DataSourceProperties each : new
YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(sourceDataSourceConfig.getRootConfig()).values())
{
- try (PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
- positionInitializer.destroy(dataSource);
- }
- }
- }
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
index 59e7f49af11..5d2c200c9e7 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
@@ -76,6 +76,9 @@ public class ScalingWatcher extends TestWatcher {
private void addZookeeperData(final String key, final String parentPath,
final ClusterPersistRepository zookeeperRepository, final Map<String, String>
nodeMap) {
String path = String.join("/", parentPath, key);
+ if (path.endsWith("/config")) {
+ return;
+ }
String data = zookeeperRepository.get(path);
nodeMap.put(path, data);
List<String> childrenKeys = zookeeperRepository.getChildrenKeys(path);