This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 41db48e6bb9 Refactor MigrationJobAPI (#29213)
41db48e6bb9 is described below

commit 41db48e6bb9323858a975a3003db3e3d7ef7e4f9
Author: Liang Zhang <zhangli...@apache.org>
AuthorDate: Mon Nov 27 03:29:19 2023 +0800

    Refactor MigrationJobAPI (#29213)
---
 .../ShowMigrationSourceStorageUnitsExecutor.java   |   5 +-
 .../handler/update/MigrateTableUpdater.java        |   5 +-
 .../RegisterMigrationSourceStorageUnitUpdater.java |   5 +-
 ...nregisterMigrationSourceStorageUnitUpdater.java |   5 +-
 .../api/impl/ConsistencyCheckJobAPI.java           |   4 +-
 ...tionJobOption.java => MigrationJobManager.java} | 165 +++-------------
 .../migration/api/impl/MigrationJobOption.java     | 209 ---------------------
 .../migration/api/impl/MigrationJobAPITest.java    |  17 +-
 8 files changed, 48 insertions(+), 367 deletions(-)

diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
index 2b5a4b5db72..bab525e6822 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.migration.distsql.handler.query;
 
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
@@ -35,11 +36,11 @@ import java.util.List;
  */
 public final class ShowMigrationSourceStorageUnitsExecutor implements 
QueryableRALExecutor<ShowMigrationSourceStorageUnitsStatement> {
     
-    private final MigrationJobOption jobOption = new MigrationJobOption();
+    private final MigrationJobManager jobManager = new MigrationJobManager(new 
MigrationJobOption());
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationSourceStorageUnitsStatement sqlStatement) {
-        Iterator<Collection<Object>> data = 
jobOption.listMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY)).iterator();
+        Iterator<Collection<Object>> data = 
jobManager.listMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY)).iterator();
         Collection<LocalDataQueryResultRow> result = new LinkedList<>();
         while (data.hasNext()) {
             result.add(new LocalDataQueryResultRow((List<Object>) 
data.next()));
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
index aeb49d9005f..23f220d73fb 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.migration.distsql.handler.update;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -32,13 +33,13 @@ import 
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStateme
 @Slf4j
 public final class MigrateTableUpdater implements 
RALUpdater<MigrateTableStatement> {
     
-    private final MigrationJobOption jobOption = new MigrationJobOption();
+    private final MigrationJobManager jobManager = new MigrationJobManager(new 
MigrationJobOption());
     
     @Override
     public void executeUpdate(final String databaseName, final 
MigrateTableStatement sqlStatement) {
         String targetDatabaseName = null == 
sqlStatement.getTargetDatabaseName() ? databaseName : 
sqlStatement.getTargetDatabaseName();
         ShardingSpherePreconditions.checkNotNull(targetDatabaseName, 
MissingRequiredTargetDatabaseException::new);
-        jobOption.createJobAndStart(new 
PipelineContextKey(InstanceType.PROXY), new 
MigrateTableStatement(sqlStatement.getSourceTargetEntries(), 
targetDatabaseName));
+        jobManager.start(new PipelineContextKey(InstanceType.PROXY), new 
MigrateTableStatement(sqlStatement.getSourceTargetEntries(), 
targetDatabaseName));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
index b89d63262ca..d26fee0d0a1 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import 
org.apache.shardingsphere.distsql.handler.validate.DataSourcePoolPropertiesValidateHandler;
@@ -42,7 +43,7 @@ import java.util.Map;
  */
 public final class RegisterMigrationSourceStorageUnitUpdater implements 
RALUpdater<RegisterMigrationSourceStorageUnitStatement> {
     
-    private final MigrationJobOption jobOption = new MigrationJobOption();
+    private final MigrationJobManager jobManager = new MigrationJobManager(new 
MigrationJobOption());
     
     private final DataSourcePoolPropertiesValidateHandler validateHandler = 
new DataSourcePoolPropertiesValidateHandler();
     
@@ -55,7 +56,7 @@ public final class RegisterMigrationSourceStorageUnitUpdater 
implements RALUpdat
         DatabaseType databaseType = 
DatabaseTypeFactory.get(urlBasedDataSourceSegment.getUrl());
         Map<String, DataSourcePoolProperties> propsMap = 
DataSourceSegmentsConverter.convert(databaseType, dataSources);
         validateHandler.validate(propsMap);
-        jobOption.addMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY), propsMap);
+        jobManager.addMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY), propsMap);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
index 37d7e8cc00f..3f4cdfe20d2 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
@@ -28,11 +29,11 @@ import 
org.apache.shardingsphere.migration.distsql.statement.UnregisterMigration
  */
 public final class UnregisterMigrationSourceStorageUnitUpdater implements 
RALUpdater<UnregisterMigrationSourceStorageUnitStatement> {
     
-    private final MigrationJobOption jobOption = new MigrationJobOption();
+    private final MigrationJobManager jobManager = new MigrationJobManager(new 
MigrationJobOption());
     
     @Override
     public void executeUpdate(final String databaseName, final 
UnregisterMigrationSourceStorageUnitStatement sqlStatement) {
-        jobOption.dropMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
+        jobManager.dropMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 472648bfdae..7d0c0135b74 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -147,11 +147,11 @@ public final class ConsistencyCheckJobAPI {
      * @param parentJobId parent job id
      */
     public void drop(final String parentJobId) {
-        jobManager.stop(parentJobId);
+        String latestCheckJobId = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
+        jobManager.stop(latestCheckJobId);
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(parentJobId);
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
         Collection<String> checkJobIds = 
governanceFacade.getJobFacade().getCheck().listCheckJobIds(parentJobId);
-        String latestCheckJobId = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
         Optional<Integer> previousSequence = 
ConsistencyCheckSequence.getPreviousSequence(
                 
checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()),
 ConsistencyCheckJobId.parseSequence(latestCheckJobId));
         if (previousSequence.isPresent()) {
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java
similarity index 61%
copy from 
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
copy to 
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java
index c3b8886940d..f356ed53733 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java
@@ -21,47 +21,21 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
-import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
@@ -94,28 +68,36 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * Migration job option.
+ * Migration job manager.
  */
 @Slf4j
-public final class MigrationJobOption implements TransmissionJobOption {
+public final class MigrationJobManager {
     
-    private final PipelineDataSourcePersistService dataSourcePersistService = 
new PipelineDataSourcePersistService();
+    private final MigrationJobOption jobOption;
+    
+    private final PipelineJobManager jobManager;
+    
+    private final PipelineDataSourcePersistService dataSourcePersistService;
+    
+    public MigrationJobManager(final MigrationJobOption jobOption) {
+        this.jobOption = jobOption;
+        jobManager = new PipelineJobManager(jobOption);
+        dataSourcePersistService = new PipelineDataSourcePersistService();
+    }
     
     /**
-     * Create job migration config and start.
+     * Start migration job.
      *
      * @param contextKey context key
      * @param param create migration job parameter
      * @return job id
      */
-    public String createJobAndStart(final PipelineContextKey contextKey, final 
MigrateTableStatement param) {
+    public String start(final PipelineContextKey contextKey, final 
MigrateTableStatement param) {
         MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
 param));
-        new PipelineJobManager(this).start(jobConfig);
+        jobManager.start(jobConfig);
         return jobConfig.getJobId();
     }
     
@@ -163,7 +145,7 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
         
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
         result.setTablesFirstDataNodes(new 
JobDataNodeLine(tablesFirstDataNodes).marshal());
         
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
-        extendYamlJobConfiguration(contextKey, result);
+        jobOption.extendYamlJobConfiguration(contextKey, result);
         return result;
     }
     
@@ -185,9 +167,7 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
     }
     
     private YamlRootConfiguration buildYamlRootConfiguration(final String 
databaseName, final Map<String, Map<String, Object>> yamlDataSources, final 
Collection<RuleConfiguration> rules) {
-        if (rules.isEmpty()) {
-            throw new NoAnyRuleExistsException(databaseName);
-        }
+        ShardingSpherePreconditions.checkState(!rules.isEmpty(), () -> new 
NoAnyRuleExistsException(databaseName));
         YamlRootConfiguration result = new YamlRootConfiguration();
         result.setDatabaseName(databaseName);
         result.setDataSources(yamlDataSources);
@@ -201,95 +181,6 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
         return result;
     }
     
-    @Override
-    public PipelineJobInfo getJobInfo(final String jobId) {
-        PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
-        List<String> sourceTables = new LinkedList<>();
-        new 
PipelineJobConfigurationManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
-                .forEach(each -> each.getEntries().forEach(entry -> 
entry.getDataNodes().forEach(dataNode -> 
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
-        return new PipelineJobInfo(jobMetaData, null, String.join(",", 
sourceTables));
-    }
-    
-    @Override
-    public void extendYamlJobConfiguration(final PipelineContextKey 
contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
-        YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration) 
yamlJobConfig;
-        if (null == yamlJobConfig.getJobId()) {
-            config.setJobId(new MigrationJobId(contextKey, 
config.getJobShardingDataNodes()).marshal());
-        }
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public YamlMigrationJobConfigurationSwapper 
getYamlJobConfigurationSwapper() {
-        return new YamlMigrationJobConfigurationSwapper();
-    }
-    
-    @Override
-    public MigrationTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
-        MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) 
pipelineJobConfig;
-        IncrementalDumperContext incrementalDumperContext = new 
MigrationIncrementalDumperContextCreator(
-                
jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
-        Collection<CreateTableConfiguration> createTableConfigs = 
buildCreateTableConfigurations(jobConfig, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
-        Set<CaseInsensitiveIdentifier> targetTableNames = 
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
-        Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
-                ((ShardingSpherePipelineDataSourceConfiguration) 
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
-        ImporterConfiguration importerConfig = buildImporterConfiguration(
-                jobConfig, processConfig, shardingColumnsMap, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
-        MigrationTaskConfiguration result = new MigrationTaskConfiguration(
-                
incrementalDumperContext.getCommonContext().getDataSourceName(), 
createTableConfigs, incrementalDumperContext, importerConfig);
-        log.info("buildTaskConfiguration, result={}", result);
-        return result;
-    }
-    
-    private Collection<CreateTableConfiguration> 
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
-        Collection<CreateTableConfiguration> result = new LinkedList<>();
-        for (JobDataNodeEntry each : 
jobConfig.getTablesFirstDataNodes().getEntries()) {
-            String sourceSchemaName = 
tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName());
-            DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData();
-            String targetSchemaName = 
dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null;
-            DataNode dataNode = each.getDataNodes().get(0);
-            PipelineDataSourceConfiguration sourceDataSourceConfig = 
jobConfig.getSources().get(dataNode.getDataSourceName());
-            CreateTableConfiguration createTableConfig = new 
CreateTableConfiguration(
-                    sourceDataSourceConfig, new 
CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
-                    jobConfig.getTarget(), new 
CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName()));
-            result.add(createTableConfig);
-        }
-        log.info("buildCreateTableConfigurations, result={}", result);
-        return result;
-    }
-    
-    private ImporterConfiguration buildImporterConfiguration(final 
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig,
-                                                             final 
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
-        MigrationProcessContext processContext = new 
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
-        JobRateLimitAlgorithm writeRateLimitAlgorithm = 
processContext.getWriteRateLimitAlgorithm();
-        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
-        int retryTimes = jobConfig.getRetryTimes();
-        int concurrency = jobConfig.getConcurrency();
-        return new ImporterConfiguration(jobConfig.getTarget(), 
shardingColumnsMap, tableAndSchemaNameMapper, batchSize, 
writeRateLimitAlgorithm, retryTimes, concurrency);
-    }
-    
-    @Override
-    public MigrationProcessContext buildProcessContext(final 
PipelineJobConfiguration jobConfig) {
-        PipelineProcessConfiguration processConfig = new 
TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
-        return new MigrationProcessContext(jobConfig.getJobId(), 
processConfig);
-    }
-    
-    @Override
-    public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig, final TransmissionProcessContext 
processContext,
-                                                                      final 
ConsistencyCheckJobItemProgressContext progressContext) {
-        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) 
jobConfig, processContext, progressContext);
-    }
-    
-    @Override
-    public Optional<String> getToBeStartDisabledNextJobType() {
-        return Optional.of("CONSISTENCY_CHECK");
-    }
-    
-    @Override
-    public Optional<String> getToBeStoppedPreviousJobType() {
-        return Optional.of("CONSISTENCY_CHECK");
-    }
-    
     /**
      * Add migration source resources.
      *
@@ -297,7 +188,7 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
      * @param propsMap data source pool properties map
      */
     public void addMigrationSourceResources(final PipelineContextKey 
contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
-        Map<String, DataSourcePoolProperties> existDataSources = 
dataSourcePersistService.load(contextKey, getType());
+        Map<String, DataSourcePoolProperties> existDataSources = 
dataSourcePersistService.load(contextKey, jobOption.getType());
         Collection<String> duplicateDataSourceNames = new 
HashSet<>(propsMap.size(), 1F);
         for (Entry<String, DataSourcePoolProperties> entry : 
propsMap.entrySet()) {
             if (existDataSources.containsKey(entry.getKey())) {
@@ -307,7 +198,7 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
         
ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), () 
-> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames));
         Map<String, DataSourcePoolProperties> result = new 
LinkedHashMap<>(existDataSources);
         result.putAll(propsMap);
-        dataSourcePersistService.persist(contextKey, getType(), result);
+        dataSourcePersistService.persist(contextKey, jobOption.getType(), 
result);
     }
     
     /**
@@ -317,13 +208,13 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
      * @param resourceNames resource names
      */
     public void dropMigrationSourceResources(final PipelineContextKey 
contextKey, final Collection<String> resourceNames) {
-        Map<String, DataSourcePoolProperties> metaDataDataSource = 
dataSourcePersistService.load(contextKey, getType());
+        Map<String, DataSourcePoolProperties> metaDataDataSource = 
dataSourcePersistService.load(contextKey, jobOption.getType());
         List<String> noExistResources = resourceNames.stream().filter(each -> 
!metaDataDataSource.containsKey(each)).collect(Collectors.toList());
         ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), () 
-> new UnregisterMigrationSourceStorageUnitException(noExistResources));
         for (String each : resourceNames) {
             metaDataDataSource.remove(each);
         }
-        dataSourcePersistService.persist(contextKey, getType(), 
metaDataDataSource);
+        dataSourcePersistService.persist(contextKey, jobOption.getType(), 
metaDataDataSource);
     }
     
     /**
@@ -333,7 +224,7 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
      * @return migration source resources
      */
     public Collection<Collection<Object>> listMigrationSourceResources(final 
PipelineContextKey contextKey) {
-        Map<String, DataSourcePoolProperties> propsMap = 
dataSourcePersistService.load(contextKey, getType());
+        Map<String, DataSourcePoolProperties> propsMap = 
dataSourcePersistService.load(contextKey, jobOption.getType());
         Collection<Collection<Object>> result = new 
ArrayList<>(propsMap.size());
         for (Entry<String, DataSourcePoolProperties> entry : 
propsMap.entrySet()) {
             String dataSourceName = entry.getKey();
@@ -367,14 +258,4 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
         }
         return "";
     }
-    
-    @Override
-    public Class<MigrationJob> getJobClass() {
-        return MigrationJob.class;
-    }
-    
-    @Override
-    public String getType() {
-        return "MIGRATION";
-    }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
index c3b8886940d..c62eb313f46 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
@@ -20,41 +20,29 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
-import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
 import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
@@ -64,36 +52,14 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
-import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
-import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
-import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
-import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.datanode.DataNode;
-import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
-import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
-import org.apache.shardingsphere.infra.util.json.JsonUtils;
-import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
-import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
-import 
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
-import 
org.apache.shardingsphere.migration.distsql.statement.pojo.SourceTargetEntry;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -104,103 +70,6 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class MigrationJobOption implements TransmissionJobOption {
     
-    private final PipelineDataSourcePersistService dataSourcePersistService = 
new PipelineDataSourcePersistService();
-    
-    /**
-     * Create job migration config and start.
-     *
-     * @param contextKey context key
-     * @param param create migration job parameter
-     * @return job id
-     */
-    public String createJobAndStart(final PipelineContextKey contextKey, final 
MigrateTableStatement param) {
-        MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
 param));
-        new PipelineJobManager(this).start(jobConfig);
-        return jobConfig.getJobId();
-    }
-    
-    private YamlMigrationJobConfiguration buildYamlJobConfiguration(final 
PipelineContextKey contextKey, final MigrateTableStatement param) {
-        YamlMigrationJobConfiguration result = new 
YamlMigrationJobConfiguration();
-        result.setTargetDatabaseName(param.getTargetDatabaseName());
-        Map<String, DataSourcePoolProperties> metaDataDataSource = 
dataSourcePersistService.load(contextKey, "MIGRATION");
-        Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
-        Map<String, YamlPipelineDataSourceConfiguration> configSources = new 
LinkedHashMap<>();
-        List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new 
HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
-                .thenComparing(each -> 
DataNodeUtils.formatWithSchema(each.getSource()))).collect(Collectors.toList());
-        YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new 
YamlDataSourceConfigurationSwapper();
-        for (SourceTargetEntry each : sourceTargetEntries) {
-            sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> 
new LinkedList<>()).add(each.getSource());
-            ShardingSpherePreconditions.checkState(1 == 
sourceDataNodes.get(each.getTargetTableName()).size(),
-                    () -> new PipelineInvalidParameterException("more than one 
source table for " + each.getTargetTableName()));
-            String dataSourceName = each.getSource().getDataSourceName();
-            if (configSources.containsKey(dataSourceName)) {
-                continue;
-            }
-            
ShardingSpherePreconditions.checkState(metaDataDataSource.containsKey(dataSourceName),
-                    () -> new PipelineInvalidParameterException(dataSourceName 
+ " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
-            Map<String, Object> sourceDataSourcePoolProps = 
dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
-            StandardPipelineDataSourceConfiguration sourceDataSourceConfig = 
new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
-            configSources.put(dataSourceName, 
buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), 
sourceDataSourceConfig.getParameter()));
-            DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
-            if (null == each.getSource().getSchemaName() && 
dialectDatabaseMetaData.isSchemaAvailable()) {
-                
each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig));
-            }
-            DatabaseType sourceDatabaseType = 
sourceDataSourceConfig.getDatabaseType();
-            if (null == result.getSourceDatabaseType()) {
-                result.setSourceDatabaseType(sourceDatabaseType.getType());
-            } else if 
(!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) {
-                throw new PipelineInvalidParameterException("Source storage 
units have different database types");
-            }
-        }
-        result.setSources(configSources);
-        ShardingSphereDatabase targetDatabase = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
-        PipelineDataSourceConfiguration targetPipelineDataSourceConfig = 
buildTargetPipelineDataSourceConfiguration(targetDatabase);
-        
result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(),
 targetPipelineDataSourceConfig.getParameter()));
-        
result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
-        List<JobDataNodeEntry> tablesFirstDataNodes = 
sourceDataNodes.entrySet().stream()
-                .map(entry -> new JobDataNodeEntry(entry.getKey(), 
entry.getValue().subList(0, 1))).collect(Collectors.toList());
-        result.setTargetTableNames(new 
ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList()));
-        
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
-        result.setTablesFirstDataNodes(new 
JobDataNodeLine(tablesFirstDataNodes).marshal());
-        
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
-        extendYamlJobConfiguration(contextKey, result);
-        return result;
-    }
-    
-    private YamlPipelineDataSourceConfiguration 
buildYamlPipelineDataSourceConfiguration(final String type, final String param) 
{
-        YamlPipelineDataSourceConfiguration result = new 
YamlPipelineDataSourceConfiguration();
-        result.setType(type);
-        result.setParameter(param);
-        return result;
-    }
-    
-    private PipelineDataSourceConfiguration 
buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase 
targetDatabase) {
-        Map<String, Map<String, Object>> targetPoolProps = new HashMap<>();
-        YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new 
YamlDataSourceConfigurationSwapper();
-        for (Entry<String, StorageUnit> entry : 
targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) {
-            targetPoolProps.put(entry.getKey(), 
dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
-        }
-        YamlRootConfiguration targetRootConfig = 
buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps, 
targetDatabase.getRuleMetaData().getConfigurations());
-        return new 
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
-    }
-    
-    private YamlRootConfiguration buildYamlRootConfiguration(final String 
databaseName, final Map<String, Map<String, Object>> yamlDataSources, final 
Collection<RuleConfiguration> rules) {
-        if (rules.isEmpty()) {
-            throw new NoAnyRuleExistsException(databaseName);
-        }
-        YamlRootConfiguration result = new YamlRootConfiguration();
-        result.setDatabaseName(databaseName);
-        result.setDataSources(yamlDataSources);
-        result.setRules(new 
YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules));
-        return result;
-    }
-    
-    private Map<String, String> buildTargetTableSchemaMap(final Map<String, 
List<DataNode>> sourceDataNodes) {
-        Map<String, String> result = new LinkedHashMap<>();
-        sourceDataNodes.forEach((tableName, dataNodes) -> 
result.put(tableName, dataNodes.get(0).getSchemaName()));
-        return result;
-    }
-    
     @Override
     public PipelineJobInfo getJobInfo(final String jobId) {
         PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
@@ -290,84 +159,6 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
         return Optional.of("CONSISTENCY_CHECK");
     }
     
-    /**
-     * Add migration source resources.
-     *
-     * @param contextKey context key
-     * @param propsMap data source pool properties map
-     */
-    public void addMigrationSourceResources(final PipelineContextKey 
contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
-        Map<String, DataSourcePoolProperties> existDataSources = 
dataSourcePersistService.load(contextKey, getType());
-        Collection<String> duplicateDataSourceNames = new 
HashSet<>(propsMap.size(), 1F);
-        for (Entry<String, DataSourcePoolProperties> entry : 
propsMap.entrySet()) {
-            if (existDataSources.containsKey(entry.getKey())) {
-                duplicateDataSourceNames.add(entry.getKey());
-            }
-        }
-        
ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), () 
-> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames));
-        Map<String, DataSourcePoolProperties> result = new 
LinkedHashMap<>(existDataSources);
-        result.putAll(propsMap);
-        dataSourcePersistService.persist(contextKey, getType(), result);
-    }
-    
-    /**
-     * Drop migration source resources.
-     *
-     * @param contextKey context key
-     * @param resourceNames resource names
-     */
-    public void dropMigrationSourceResources(final PipelineContextKey 
contextKey, final Collection<String> resourceNames) {
-        Map<String, DataSourcePoolProperties> metaDataDataSource = 
dataSourcePersistService.load(contextKey, getType());
-        List<String> noExistResources = resourceNames.stream().filter(each -> 
!metaDataDataSource.containsKey(each)).collect(Collectors.toList());
-        ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), () 
-> new UnregisterMigrationSourceStorageUnitException(noExistResources));
-        for (String each : resourceNames) {
-            metaDataDataSource.remove(each);
-        }
-        dataSourcePersistService.persist(contextKey, getType(), 
metaDataDataSource);
-    }
-    
-    /**
-     * Query migration source resources list.
-     *
-     * @param contextKey context key
-     * @return migration source resources
-     */
-    public Collection<Collection<Object>> listMigrationSourceResources(final 
PipelineContextKey contextKey) {
-        Map<String, DataSourcePoolProperties> propsMap = 
dataSourcePersistService.load(contextKey, getType());
-        Collection<Collection<Object>> result = new 
ArrayList<>(propsMap.size());
-        for (Entry<String, DataSourcePoolProperties> entry : 
propsMap.entrySet()) {
-            String dataSourceName = entry.getKey();
-            DataSourcePoolProperties value = entry.getValue();
-            Collection<Object> props = new LinkedList<>();
-            props.add(dataSourceName);
-            String url = 
String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url"));
-            DatabaseType databaseType = DatabaseTypeFactory.get(url);
-            props.add(databaseType.getType());
-            ConnectionProperties connectionProps = 
DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, 
databaseType).parse(url, "", null);
-            props.add(connectionProps.getHostname());
-            props.add(connectionProps.getPort());
-            props.add(connectionProps.getCatalog());
-            Map<String, Object> standardProps = 
value.getPoolPropertySynonyms().getStandardProperties();
-            props.add(getStandardProperty(standardProps, 
"connectionTimeoutMilliseconds"));
-            props.add(getStandardProperty(standardProps, 
"idleTimeoutMilliseconds"));
-            props.add(getStandardProperty(standardProps, 
"maxLifetimeMilliseconds"));
-            props.add(getStandardProperty(standardProps, "maxPoolSize"));
-            props.add(getStandardProperty(standardProps, "minPoolSize"));
-            props.add(getStandardProperty(standardProps, "readOnly"));
-            Map<String, Object> otherProps = 
value.getCustomProperties().getProperties();
-            props.add(otherProps.isEmpty() ? "" : 
JsonUtils.toJsonString(otherProps));
-            result.add(props);
-        }
-        return result;
-    }
-    
-    private String getStandardProperty(final Map<String, Object> 
standardProps, final String key) {
-        if (standardProps.containsKey(key) && null != standardProps.get(key)) {
-            return standardProps.get(key).toString();
-        }
-        return "";
-    }
-    
     @Override
     public Class<MigrationJob> getJobClass() {
         return MigrationJob.class;
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 4cb3930a110..32e144ab645 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -40,6 +40,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManag
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -95,6 +96,8 @@ class MigrationJobAPITest {
     
     private static MigrationJobAPI jobAPI;
     
+    private static MigrationJobManager migrationJobManager;
+    
     private static PipelineJobConfigurationManager jobConfigManager;
     
     private static PipelineJobManager jobManager;
@@ -110,6 +113,7 @@ class MigrationJobAPITest {
         PipelineContextUtils.mockModeConfigAndContextManager();
         jobOption = new MigrationJobOption();
         jobAPI = new MigrationJobAPI();
+        migrationJobManager = new MigrationJobManager(jobOption);
         jobConfigManager = new PipelineJobConfigurationManager(jobOption);
         jobManager = new PipelineJobManager(jobOption);
         transmissionJobManager = new TransmissionJobManager(jobOption);
@@ -120,12 +124,13 @@ class MigrationJobAPITest {
         props.put("jdbcUrl", jdbcUrl);
         props.put("username", "root");
         props.put("password", "root");
-        
jobOption.addMigrationSourceResources(PipelineContextUtils.getContextKey(), 
Collections.singletonMap("ds_0", new 
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
+        migrationJobManager.addMigrationSourceResources(
+                PipelineContextUtils.getContextKey(), 
Collections.singletonMap("ds_0", new 
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
     }
     
     @AfterAll
     static void afterClass() {
-        
jobOption.dropMigrationSourceResources(PipelineContextUtils.getContextKey(), 
Collections.singletonList("ds_0"));
+        
migrationJobManager.dropMigrationSourceResources(PipelineContextUtils.getContextKey(),
 Collections.singletonList("ds_0"));
     }
     
     @Test
@@ -253,20 +258,20 @@ class MigrationJobAPITest {
     void assertCreateJobConfigFailedOnMoreThanOneSourceTable() {
         List<SourceTargetEntry> sourceTargetEntries = Stream.of("t_order_0", 
"t_order_1")
                 .map(each -> new SourceTargetEntry("logic_db", new 
DataNode("ds_0", each), "t_order")).collect(Collectors.toList());
-        assertThrows(PipelineInvalidParameterException.class, () -> 
jobOption.createJobAndStart(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(sourceTargetEntries, "logic_db")));
+        assertThrows(PipelineInvalidParameterException.class, () -> 
migrationJobManager.start(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(sourceTargetEntries, "logic_db")));
     }
     
     @Test
     void assertCreateJobConfigFailedOnDataSourceNotExist() {
         List<SourceTargetEntry> sourceTargetEntries = 
Collections.singletonList(new SourceTargetEntry("logic_db", new 
DataNode("ds_not_exists", "t_order"), "t_order"));
-        assertThrows(PipelineInvalidParameterException.class, () -> 
jobOption.createJobAndStart(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(sourceTargetEntries, "logic_db")));
+        assertThrows(PipelineInvalidParameterException.class, () -> 
migrationJobManager.start(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(sourceTargetEntries, "logic_db")));
     }
     
     @Test
     void assertCreateJobConfig() throws SQLException {
         initIntPrimaryEnvironment();
         SourceTargetEntry sourceTargetEntry = new 
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
-        String jobId = 
jobOption.createJobAndStart(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(Collections.singletonList(sourceTargetEntry), 
"logic_db"));
+        String jobId = 
migrationJobManager.start(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(Collections.singletonList(sourceTargetEntry), 
"logic_db"));
         MigrationJobConfiguration actual = 
jobConfigManager.getJobConfiguration(jobId);
         assertThat(actual.getTargetDatabaseName(), is("logic_db"));
         List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
@@ -294,7 +299,7 @@ class MigrationJobAPITest {
     
     @Test
     void assertShowMigrationSourceResources() {
-        Collection<Collection<Object>> actual = 
jobOption.listMigrationSourceResources(PipelineContextUtils.getContextKey());
+        Collection<Collection<Object>> actual = 
migrationJobManager.listMigrationSourceResources(PipelineContextUtils.getContextKey());
         assertThat(actual.size(), is(1));
         Collection<Object> objects = actual.iterator().next();
         assertThat(objects.toArray()[0], is("ds_0"));

Reply via email to