This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ee9c4154d7 Extract PipelineJobPreparerUtils from 
RuleAlteredJobPreparer for common usage (#20051)
4ee9c4154d7 is described below

commit 4ee9c4154d7379f1dd37849a326b5f1d8a15e79a
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Aug 10 17:29:16 2022 +0800

    Extract PipelineJobPreparerUtils from RuleAlteredJobPreparer for common 
usage (#20051)
---
 .../core/util/PipelineJobPreparerUtils.java        | 167 +++++++++++++++++++++
 .../rulealtered/RuleAlteredJobPreparer.java        |  86 ++---------
 .../pipeline/framework/watcher/ScalingWatcher.java |   3 +
 3 files changed, 184 insertions(+), 72 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
new file mode 100644
index 00000000000..25e0c10471e
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
+import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparerFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
+import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
+import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceCheckerFactory;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import 
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * Pipeline job preparer utils.
+ */
+@Slf4j
+public final class PipelineJobPreparerUtils {
+    
+    /**
+     * Prepare target schema.
+     *
+     * @param databaseType database type
+     * @param prepareTargetSchemasParameter prepare target schemas parameter
+     */
+    public static void prepareTargetSchema(final String databaseType, final 
PrepareTargetSchemasParameter prepareTargetSchemasParameter) {
+        Optional<DataSourcePreparer> dataSourcePreparer = 
DataSourcePreparerFactory.getInstance(databaseType);
+        if (!dataSourcePreparer.isPresent()) {
+            log.info("dataSourcePreparer null, ignore prepare target");
+            return;
+        }
+        
dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
+    }
+    
+    /**
+     * Prepare target tables.
+     *
+     * @param databaseType database type
+     * @param prepareTargetTablesParameter prepare target tables parameter
+     */
+    public static void prepareTargetTables(final String databaseType, final 
PrepareTargetTablesParameter prepareTargetTablesParameter) {
+        Optional<DataSourcePreparer> dataSourcePreparer = 
DataSourcePreparerFactory.getInstance(databaseType);
+        if (!dataSourcePreparer.isPresent()) {
+            log.info("dataSourcePreparer null, ignore prepare target");
+            return;
+        }
+        
dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParameter);
+    }
+    
+    /**
+     * Get incremental position.
+     *
+     * @param initIncremental init incremental
+     * @param dumperConfig dumper config
+     * @param dataSourceManager data source manager
+     * @return ingest position
+     * @throws SQLException sql exception
+     */
+    public static IngestPosition<?> getIncrementalPosition(final 
JobItemIncrementalTasksProgress initIncremental, final DumperConfiguration 
dumperConfig,
+                                                           final 
PipelineDataSourceManager dataSourceManager) throws SQLException {
+        if (null != initIncremental) {
+            Optional<IngestPosition<?>> position = 
initIncremental.getIncrementalPosition(dumperConfig.getDataSourceName());
+            if (position.isPresent()) {
+                return position.get();
+            }
+        }
+        String databaseType = 
dumperConfig.getDataSourceConfig().getDatabaseType().getType();
+        DataSource dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
+        return 
PositionInitializerFactory.getInstance(databaseType).init(dataSource);
+    }
+    
+    /**
+     * Check data source.
+     *
+     * @param databaseType database type
+     * @param dataSources data source
+     */
+    public static void checkSourceDataSource(final String databaseType, final 
Collection<? extends DataSource> dataSources) {
+        if (null == dataSources || dataSources.isEmpty()) {
+            log.info("source data source is empty, skip check");
+            return;
+        }
+        DataSourceChecker dataSourceChecker = 
DataSourceCheckerFactory.getInstance(databaseType);
+        dataSourceChecker.checkConnection(dataSources);
+        dataSourceChecker.checkPrivilege(dataSources);
+        dataSourceChecker.checkVariable(dataSources);
+    }
+    
+    /**
+     * Check target data source.
+     *
+     * @param databaseType database type
+     * @param importerConfig importer config
+     * @param targetDataSources target data sources
+     */
+    public static void checkTargetDataSource(final String databaseType, final 
ImporterConfiguration importerConfig, final Collection<? extends DataSource> 
targetDataSources) {
+        DataSourceChecker dataSourceChecker = 
DataSourceCheckerFactory.getInstance(databaseType);
+        if (null == targetDataSources || targetDataSources.isEmpty()) {
+            log.info("target data source is empty, skip check");
+            return;
+        }
+        dataSourceChecker.checkConnection(targetDataSources);
+        dataSourceChecker.checkTargetTable(targetDataSources, 
importerConfig.getTableNameSchemaNameMapping(), 
importerConfig.getLogicTableNames());
+    }
+    
+    
+    /**
+     * Cleanup job preparer.
+     *
+     * @param pipelineDataSourceConfig pipeline data source config
+     * @throws SQLException sql exception
+     */
+    public static void destroyPosition(final PipelineDataSourceConfiguration 
pipelineDataSourceConfig) throws SQLException {
+        DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType();
+        PositionInitializer positionInitializer = 
PositionInitializerFactory.getInstance(databaseType.getType());
+        log.info("Cleanup database type:{}, data source type:{}", 
databaseType.getType(), pipelineDataSourceConfig.getType());
+        if (pipelineDataSourceConfig instanceof 
ShardingSpherePipelineDataSourceConfiguration) {
+            ShardingSpherePipelineDataSourceConfiguration dataSourceConfig = 
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig;
+            for (DataSourceProperties each : new 
YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(dataSourceConfig.getRootConfig()).values())
 {
+                try (PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
+                    positionInitializer.destroy(dataSource);
+                }
+            }
+        }
+        if (pipelineDataSourceConfig instanceof 
StandardPipelineDataSourceConfiguration) {
+            StandardPipelineDataSourceConfiguration dataSourceConfig = 
(StandardPipelineDataSourceConfiguration) pipelineDataSourceConfig;
+            try (PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(
+                    DataSourcePoolCreator.create((DataSourceProperties) 
dataSourceConfig.getDataSourceConfiguration()), databaseType)) {
+                positionInitializer.destroy(dataSource);
+            }
+        }
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 8f16493af12..b9acabdbcca 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -19,48 +19,34 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
-import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparerFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineJobPreparerUtils;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare.InventoryTaskSplitter;
-import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
-import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceCheckerFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
-import 
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
-import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.LockDefinition;
-import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
 
-import javax.sql.DataSource;
 import java.sql.SQLException;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -75,7 +61,7 @@ public final class RuleAlteredJobPreparer {
      * @param jobContext job context
      */
     public void prepare(final RuleAlteredJobContext jobContext) {
-        checkSourceDataSource(jobContext);
+        
PipelineJobPreparerUtils.checkSourceDataSource(jobContext.getJobConfig().getSourceDatabaseType(),
 Collections.singleton(jobContext.getSourceDataSource()));
         if (jobContext.isStopping()) {
             throw new PipelineIgnoredException("Job stopping, jobId=" + 
jobContext.getJobId());
         }
@@ -133,27 +119,24 @@ public final class RuleAlteredJobPreparer {
         JobProgress initProgress = jobContext.getInitProgress();
         if (null == initProgress || initProgress.getStatus() == 
JobStatus.PREPARING_FAILURE) {
             PipelineDataSourceWrapper targetDataSource = 
jobContext.getDataSourceManager().getDataSource(jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
-            checkTargetDataSource(jobContext, targetDataSource);
+            
PipelineJobPreparerUtils.checkTargetDataSource(jobContext.getJobConfig().getTargetDatabaseType(),
 jobContext.getTaskConfig().getImporterConfig(),
+                    Collections.singletonList(targetDataSource));
         }
     }
     
     private void prepareTarget(final RuleAlteredJobContext jobContext) {
         RuleAlteredJobConfiguration jobConfig = jobContext.getJobConfig();
-        Optional<DataSourcePreparer> dataSourcePreparer = 
DataSourcePreparerFactory.getInstance(jobConfig.getTargetDatabaseType());
-        if (!dataSourcePreparer.isPresent()) {
-            log.info("dataSourcePreparer null, ignore prepare target");
-            return;
-        }
         TableNameSchemaNameMapping tableNameSchemaNameMapping = 
jobContext.getTaskConfig().getDumperConfig().getTableNameSchemaNameMapping();
-        PrepareTargetSchemasParameter prepareTargetSchemasParameter = new 
PrepareTargetSchemasParameter(jobConfig.splitLogicTableNames(),
-                
DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()),
-                jobConfig.getDatabaseName(), 
jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), 
jobContext.getDataSourceManager(), tableNameSchemaNameMapping);
+        String targetDatabaseType = jobConfig.getTargetDatabaseType();
         if (isSourceAndTargetSchemaAvailable(jobConfig)) {
-            
dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
+            PrepareTargetSchemasParameter prepareTargetSchemasParameter = new 
PrepareTargetSchemasParameter(jobConfig.splitLogicTableNames(),
+                    DatabaseTypeFactory.getInstance(targetDatabaseType),
+                    jobConfig.getDatabaseName(), 
jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), 
jobContext.getDataSourceManager(), tableNameSchemaNameMapping);
+            PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, 
prepareTargetSchemasParameter);
         }
         PrepareTargetTablesParameter prepareTargetTablesParameter = new 
PrepareTargetTablesParameter(jobConfig.getDatabaseName(), 
jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
                 jobContext.getDataSourceManager(), 
jobConfig.getTablesFirstDataNodes(), tableNameSchemaNameMapping);
-        
dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParameter);
+        PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, 
prepareTargetTablesParameter);
     }
     
     private boolean isSourceAndTargetSchemaAvailable(final 
RuleAlteredJobConfiguration jobConfig) {
@@ -166,22 +149,6 @@ public final class RuleAlteredJobPreparer {
         return true;
     }
     
-    private void checkSourceDataSource(final RuleAlteredJobContext jobContext) 
{
-        DataSourceChecker dataSourceChecker = 
DataSourceCheckerFactory.getInstance(jobContext.getJobConfig().getSourceDatabaseType());
-        Collection<PipelineDataSourceWrapper> sourceDataSources = 
Collections.singleton(jobContext.getSourceDataSource());
-        dataSourceChecker.checkConnection(sourceDataSources);
-        dataSourceChecker.checkPrivilege(sourceDataSources);
-        dataSourceChecker.checkVariable(sourceDataSources);
-    }
-    
-    private void checkTargetDataSource(final RuleAlteredJobContext jobContext, 
final PipelineDataSourceWrapper targetDataSource) {
-        DataSourceChecker dataSourceChecker = 
DataSourceCheckerFactory.getInstance(jobContext.getJobConfig().getTargetDatabaseType());
-        Collection<PipelineDataSourceWrapper> targetDataSources = 
Collections.singletonList(targetDataSource);
-        dataSourceChecker.checkConnection(targetDataSources);
-        ImporterConfiguration importerConfig = 
jobContext.getTaskConfig().getImporterConfig();
-        dataSourceChecker.checkTargetTable(targetDataSources, 
importerConfig.getTableNameSchemaNameMapping(), 
importerConfig.getLogicTableNames());
-    }
-    
     private void initInventoryTasks(final RuleAlteredJobContext jobContext) {
         InventoryTaskSplitter inventoryTaskSplitter = new 
InventoryTaskSplitter(jobContext.getSourceMetaDataLoader(), 
jobContext.getDataSourceManager(),
                 jobContext.getJobProcessContext().getImporterExecuteEngine(), 
jobContext.getSourceDataSource(), jobContext.getTaskConfig(), 
jobContext.getInitProgress());
@@ -193,7 +160,8 @@ public final class RuleAlteredJobPreparer {
         ExecuteEngine incrementalDumperExecuteEngine = 
jobContext.getJobProcessContext().getIncrementalDumperExecuteEngine();
         TaskConfiguration taskConfig = jobContext.getTaskConfig();
         PipelineDataSourceManager dataSourceManager = 
jobContext.getDataSourceManager();
-        
taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext, 
taskConfig, dataSourceManager));
+        JobItemIncrementalTasksProgress incrementalTasksProgress = 
jobContext.getInitProgress() == null ? null : 
jobContext.getInitProgress().getIncremental();
+        
taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(incrementalTasksProgress,
 taskConfig.getDumperConfig(), dataSourceManager));
         PipelineTableMetaDataLoader sourceMetaDataLoader = 
jobContext.getSourceMetaDataLoader();
         DefaultPipelineJobProgressListener jobProgressListener = new 
DefaultPipelineJobProgressListener(jobContext.getJobId(), 
jobContext.getShardingItem());
         IncrementalTask incrementalTask = new 
IncrementalTask(taskConfig.getImporterConfig().getConcurrency(),
@@ -201,20 +169,6 @@ public final class RuleAlteredJobPreparer {
         jobContext.getIncrementalTasks().add(incrementalTask);
     }
     
-    private IngestPosition<?> getIncrementalPosition(final 
RuleAlteredJobContext jobContext, final TaskConfiguration taskConfig,
-                                                     final 
PipelineDataSourceManager dataSourceManager) throws SQLException {
-        JobProgress initProgress = jobContext.getInitProgress();
-        if (null != initProgress) {
-            Optional<IngestPosition<?>> position = 
initProgress.getIncremental().getIncrementalPosition(taskConfig.getDumperConfig().getDataSourceName());
-            if (position.isPresent()) {
-                return position.get();
-            }
-        }
-        String databaseType = 
taskConfig.getDumperConfig().getDataSourceConfig().getDatabaseType().getType();
-        DataSource dataSource = 
dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig());
-        return 
PositionInitializerFactory.getInstance(databaseType).init(dataSource);
-    }
-    
     /**
      * Do cleanup work for scaling job.
      *
@@ -222,21 +176,9 @@ public final class RuleAlteredJobPreparer {
      */
     public void cleanup(final RuleAlteredJobConfiguration jobConfig) {
         try {
-            cleanup0(jobConfig);
+            PipelineJobPreparerUtils.destroyPosition(jobConfig.getSource());
         } catch (final SQLException ex) {
             log.warn("Scaling job destroying failed", ex);
         }
     }
-    
-    private void cleanup0(final RuleAlteredJobConfiguration jobConfig) throws 
SQLException {
-        DatabaseType databaseType = 
DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType());
-        PositionInitializer positionInitializer = 
PositionInitializerFactory.getInstance(databaseType.getType());
-        ShardingSpherePipelineDataSourceConfiguration sourceDataSourceConfig = 
(ShardingSpherePipelineDataSourceConfiguration) 
PipelineDataSourceConfigurationFactory
-                .newInstance(jobConfig.getSource().getType(), 
jobConfig.getSource().getParameter());
-        for (DataSourceProperties each : new 
YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(sourceDataSourceConfig.getRootConfig()).values())
 {
-            try (PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
-                positionInitializer.destroy(dataSource);
-            }
-        }
-    }
 }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
index 59e7f49af11..5d2c200c9e7 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
@@ -76,6 +76,9 @@ public class ScalingWatcher extends TestWatcher {
     
     private void addZookeeperData(final String key, final String parentPath, 
final ClusterPersistRepository zookeeperRepository, final Map<String, String> 
nodeMap) {
         String path = String.join("/", parentPath, key);
+        if (path.endsWith("/config")) {
+            return;
+        }
         String data = zookeeperRepository.get(path);
         nodeMap.put(path, data);
         List<String> childrenKeys = zookeeperRepository.getChildrenKeys(path);

Reply via email to