This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3139cb9129a Refactor 
PipelineJobConfiguration.getSourceDatabaseType()'s return type (#26857)
3139cb9129a is described below

commit 3139cb9129acdae502f1b884172eceec9588976b
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Jul 9 17:12:48 2023 +0800

    Refactor PipelineJobConfiguration.getSourceDatabaseType()'s return type 
(#26857)
---
 .../config/job/PipelineJobConfiguration.java       |  4 +++-
 .../AbstractInventoryIncrementalJobAPIImpl.java    |  2 +-
 .../core/preparer/InventoryTaskSplitter.java       |  5 +----
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  2 +-
 .../cdc/config/job/CDCJobConfiguration.java        |  3 ++-
 .../swapper/YamlCDCJobConfigurationSwapper.java    |  7 +++++--
 .../job/YamlCDCJobConfigurationSwapperTest.java    |  1 +
 .../config/ConsistencyCheckJobConfiguration.java   |  3 ++-
 .../migration/api/impl/MigrationJobAPI.java        |  4 ++--
 .../config/MigrationJobConfiguration.java          |  5 +++--
 .../migration/context/MigrationJobItemContext.java |  2 +-
 .../migration/prepare/MigrationJobPreparer.java    | 22 +++++++++-------------
 .../job/YamlMigrationJobConfigurationSwapper.java  |  8 +++++---
 13 files changed, 36 insertions(+), 32 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
index 3240f0ed8de..cc9df2d6348 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.common.config.job;
 
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+
 /**
  * Pipeline job configuration.
  */
@@ -41,5 +43,5 @@ public interface PipelineJobConfiguration {
      *
      * @return source database type
      */
-    String getSourceDatabaseType();
+    DatabaseType getSourceDatabaseType();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 9267949059d..ff43cf6fa40 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -146,7 +146,7 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         InventoryIncrementalJobItemContext context = 
(InventoryIncrementalJobItemContext) jobItemContext;
         InventoryIncrementalJobItemProgress jobItemProgress = new 
InventoryIncrementalJobItemProgress();
         jobItemProgress.setStatus(context.getStatus());
-        
jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType());
+        
jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType().getType());
         jobItemProgress.setDataSourceName(context.getDataSourceName());
         
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
         
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
index b659ba175ca..5a0dae236ef 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
@@ -48,9 +48,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -205,8 +203,7 @@ public final class InventoryTaskSplitter {
     
     private Range<Long> getUniqueKeyValuesRange(final 
InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource, 
final InventoryDumperConfiguration dumperConfig) {
         String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
-        DatabaseType sourceDatabaseType = 
TypedSPILoader.getService(DatabaseType.class, 
jobItemContext.getJobConfig().getSourceDatabaseType());
-        String sql = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, sourceDatabaseType)
+        String sql = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
jobItemContext.getJobConfig().getSourceDatabaseType())
                 .buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new 
LogicTableName(dumperConfig.getLogicTableName())), 
dumperConfig.getActualTableName(), uniqueKey);
         try (
                 Connection connection = dataSource.getConnection();
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index a85e6c1696d..f351f866e85 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -194,7 +194,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
                                                                                
               final PipelineDataSourceManager dataSourceManager,
                                                                                
               final DumperConfiguration dumperConfig) throws SQLException {
         InventoryIncrementalJobItemProgress result = new 
InventoryIncrementalJobItemProgress();
-        result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
+        
result.setSourceDatabaseType(jobConfig.getSourceDatabaseType().getType());
         result.setDataSourceName(dumperConfig.getDataSourceName());
         IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, 
dumperConfig, dataSourceManager));
         result.setIncremental(new 
JobItemIncrementalTasksProgress(incrementalTaskProgress));
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index 027af04acf8..28d6afca706 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Shardi
 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 
 import java.util.List;
 import java.util.Properties;
@@ -42,7 +43,7 @@ public final class CDCJobConfiguration implements 
PipelineJobConfiguration {
     
     private final boolean full;
     
-    private final String sourceDatabaseType;
+    private final DatabaseType sourceDatabaseType;
     
     private final ShardingSpherePipelineDataSourceConfiguration 
dataSourceConfig;
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java
index f4cb413f470..5002c389bc8 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java
@@ -25,6 +25,8 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
@@ -46,7 +48,7 @@ public final class YamlCDCJobConfigurationSwapper implements 
YamlConfigurationSw
         result.setDatabaseName(data.getDatabaseName());
         result.setSchemaTableNames(data.getSchemaTableNames());
         result.setFull(data.isFull());
-        result.setSourceDatabaseType(data.getSourceDatabaseType());
+        result.setSourceDatabaseType(data.getSourceDatabaseType().getType());
         
result.setDataSourceConfiguration(dataSourceConfigSwapper.swapToYamlConfiguration(data.getDataSourceConfig()));
         result.setTablesFirstDataNodes(null == data.getTablesFirstDataNodes() 
? null : data.getTablesFirstDataNodes().marshal());
         List<String> jobShardingDataNodes = null == 
data.getJobShardingDataNodes() ? null : 
data.getJobShardingDataNodes().stream().map(JobDataNodeLine::marshal).collect(Collectors.toList());
@@ -73,7 +75,8 @@ public final class YamlCDCJobConfigurationSwapper implements 
YamlConfigurationSw
         YamlSinkConfiguration yamlSinkConfig = yamlConfig.getSinkConfig();
         SinkConfiguration sinkConfig = new 
SinkConfiguration(CDCSinkType.valueOf(yamlSinkConfig.getSinkType()), 
yamlSinkConfig.getProps());
         JobDataNodeLine tablesFirstDataNodes = null == 
yamlConfig.getTablesFirstDataNodes() ? null : 
JobDataNodeLine.unmarshal(yamlConfig.getTablesFirstDataNodes());
-        return new CDCJobConfiguration(yamlConfig.getJobId(), 
yamlConfig.getDatabaseName(), yamlConfig.getSchemaTableNames(), 
yamlConfig.isFull(), yamlConfig.getSourceDatabaseType(),
+        return new CDCJobConfiguration(yamlConfig.getJobId(), 
yamlConfig.getDatabaseName(), yamlConfig.getSchemaTableNames(), 
yamlConfig.isFull(),
+                TypedSPILoader.getService(DatabaseType.class, 
yamlConfig.getSourceDatabaseType()),
                 (ShardingSpherePipelineDataSourceConfiguration) 
dataSourceConfigSwapper.swapToObject(yamlConfig.getDataSourceConfiguration()), 
tablesFirstDataNodes,
                 jobShardingDataNodes, yamlConfig.isDecodeWithTX(), sinkConfig, 
yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
index a4888939630..dbc58f60781 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
@@ -39,6 +39,7 @@ class YamlCDCJobConfigurationSwapperTest {
         yamlJobConfig.setDatabaseName("test_db");
         yamlJobConfig.setSchemaTableNames(Arrays.asList("test.t_order", 
"t_order_item"));
         yamlJobConfig.setFull(true);
+        yamlJobConfig.setSourceDatabaseType("MySQL");
         YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
         sinkConfig.setSinkType(CDCSinkType.SOCKET.name());
         yamlJobConfig.setSinkConfig(sinkConfig);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
index 34cadbf952d..f6a5af93734 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
@@ -21,6 +21,7 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 
 import java.util.Properties;
 
@@ -41,7 +42,7 @@ public final class ConsistencyCheckJobConfiguration 
implements PipelineJobConfig
     private final Properties algorithmProps;
     
     @Override
-    public String getSourceDatabaseType() {
+    public DatabaseType getSourceDatabaseType() {
         throw new UnsupportedOperationException("");
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 0402c5e6aa2..23e411f4fba 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -294,7 +294,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
         Collection<CreateTableEntry> createTableEntries = new LinkedList<>();
         for (JobDataNodeEntry each : 
jobConfig.getTablesFirstDataNodes().getEntries()) {
             String sourceSchemaName = 
tableNameSchemaNameMapping.getSchemaName(each.getLogicTableName());
-            String targetSchemaName = 
TypedSPILoader.getService(DatabaseType.class, 
jobConfig.getTargetDatabaseType()).isSchemaAvailable() ? sourceSchemaName : 
null;
+            String targetSchemaName = 
jobConfig.getTargetDatabaseType().isSchemaAvailable() ? sourceSchemaName : null;
             DataNode dataNode = each.getDataNodes().get(0);
             PipelineDataSourceConfiguration sourceDataSourceConfig = 
jobConfig.getSources().get(dataNode.getDataSourceName());
             CreateTableEntry createTableEntry = new CreateTableEntry(
@@ -396,7 +396,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     
     private void cleanTempTableOnRollback(final String jobId) throws 
SQLException {
         MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
-        PipelineSQLBuilder pipelineSQLBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
TypedSPILoader.getService(DatabaseType.class, 
jobConfig.getTargetDatabaseType()));
+        PipelineSQLBuilder pipelineSQLBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
jobConfig.getTargetDatabaseType());
         TableNameSchemaNameMapping mapping = new 
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
         try (
                 PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
index 79bd8d470bf..2e95afce23c 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
@@ -23,6 +23,7 @@ import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 
 import java.util.List;
 import java.util.Map;
@@ -39,9 +40,9 @@ public final class MigrationJobConfiguration implements 
PipelineJobConfiguration
     
     private final String targetDatabaseName;
     
-    private final String sourceDatabaseType;
+    private final DatabaseType sourceDatabaseType;
     
-    private final String targetDatabaseType;
+    private final DatabaseType targetDatabaseType;
     
     private final Map<String, PipelineDataSourceConfiguration> sources;
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index 4fa0e602447..fabd841a7e0 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -135,7 +135,7 @@ public final class MigrationJobItemContext implements 
InventoryIncrementalJobIte
      * @return true if source and target database the same, otherwise false
      */
     public boolean isSourceTargetDatabaseTheSame() {
-        return 
jobConfig.getSourceDatabaseType().equalsIgnoreCase(jobConfig.getTargetDatabaseType());
+        return jobConfig.getSourceDatabaseType() == 
jobConfig.getTargetDatabaseType();
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 8fc99777aef..c7bf5a2187e 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -56,7 +56,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.lock.GlobalLockNames;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.LockDefinition;
@@ -65,7 +64,6 @@ import org.apache.shardingsphere.infra.parser.SQLParserEngine;
 import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
 
 import java.sql.SQLException;
@@ -92,8 +90,7 @@ public final class MigrationJobPreparer {
     public void prepare(final MigrationJobItemContext jobItemContext) throws 
SQLException {
         
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("Migration 
inventory dumper only support StandardPipelineDataSourceConfiguration"));
-        DatabaseType sourceDatabaseType = 
TypedSPILoader.getService(DatabaseType.class, 
jobItemContext.getJobConfig().getSourceDatabaseType());
-        PipelineJobPreparerUtils.checkSourceDataSource(sourceDatabaseType, 
Collections.singleton(jobItemContext.getSourceDataSource()));
+        
PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(),
 Collections.singleton(jobItemContext.getSourceDataSource()));
         if (jobItemContext.isStopping()) {
             PipelineJobCenter.stop(jobItemContext.getJobId());
             return;
@@ -105,7 +102,7 @@ public final class MigrationJobPreparer {
         }
         prepareIncremental(jobItemContext);
         initInventoryTasks(jobItemContext);
-        if 
(PipelineJobPreparerUtils.isIncrementalSupported(sourceDatabaseType)) {
+        if 
(PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType()))
 {
             initIncrementalTasks(jobItemContext);
             if (jobItemContext.isStopping()) {
                 PipelineJobCenter.stop(jobItemContext.getJobId());
@@ -146,27 +143,26 @@ public final class MigrationJobPreparer {
     }
     
     private void prepareAndCheckTarget(final MigrationJobItemContext 
jobItemContext) throws SQLException {
-        DatabaseType targetDatabaseType = 
TypedSPILoader.getService(DatabaseType.class, 
jobItemContext.getJobConfig().getTargetDatabaseType());
         if (jobItemContext.isSourceTargetDatabaseTheSame()) {
-            prepareTarget(jobItemContext, targetDatabaseType);
+            prepareTarget(jobItemContext);
         }
         InventoryIncrementalJobItemProgress initProgress = 
jobItemContext.getInitProgress();
         if (null == initProgress) {
             PipelineDataSourceWrapper targetDataSource = 
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
-            PipelineJobPreparerUtils.checkTargetDataSource(targetDatabaseType, 
jobItemContext.getTaskConfig().getImporterConfig(), 
Collections.singleton(targetDataSource));
+            PipelineJobPreparerUtils.checkTargetDataSource(
+                    jobItemContext.getJobConfig().getTargetDatabaseType(), 
jobItemContext.getTaskConfig().getImporterConfig(), 
Collections.singleton(targetDataSource));
         }
     }
     
-    private void prepareTarget(final MigrationJobItemContext jobItemContext, 
final DatabaseType targetDatabaseType) throws SQLException {
+    private void prepareTarget(final MigrationJobItemContext jobItemContext) 
throws SQLException {
         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
         CreateTableConfiguration createTableConfig = 
jobItemContext.getTaskConfig().getCreateTableConfig();
         PipelineDataSourceManager dataSourceManager = 
jobItemContext.getDataSourceManager();
-        PrepareTargetSchemasParameter prepareTargetSchemasParam = new 
PrepareTargetSchemasParameter(
-                DatabaseTypedSPILoader.getService(DatabaseType.class, 
targetDatabaseType), createTableConfig, dataSourceManager);
-        PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, 
prepareTargetSchemasParam);
+        PrepareTargetSchemasParameter prepareTargetSchemasParam = new 
PrepareTargetSchemasParameter(jobItemContext.getJobConfig().getTargetDatabaseType(),
 createTableConfig, dataSourceManager);
+        
PipelineJobPreparerUtils.prepareTargetSchema(jobItemContext.getJobConfig().getTargetDatabaseType(),
 prepareTargetSchemasParam);
         ShardingSphereMetaData metaData = 
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
         SQLParserEngine sqlParserEngine = 
PipelineJobPreparerUtils.getSQLParserEngine(metaData, 
jobConfig.getTargetDatabaseName());
-        PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, new 
PrepareTargetTablesParameter(createTableConfig, dataSourceManager, 
sqlParserEngine));
+        
PipelineJobPreparerUtils.prepareTargetTables(jobItemContext.getJobConfig().getTargetDatabaseType(),
 new PrepareTargetTablesParameter(createTableConfig, dataSourceManager, 
sqlParserEngine));
     }
     
     private void prepareIncremental(final MigrationJobItemContext 
jobItemContext) {
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java
index 0320b1c8c68..52db3c7858f 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.data.pipeline.yaml.job;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
@@ -39,8 +41,8 @@ public final class YamlMigrationJobConfigurationSwapper 
implements YamlConfigura
         YamlMigrationJobConfiguration result = new 
YamlMigrationJobConfiguration();
         result.setJobId(data.getJobId());
         result.setTargetDatabaseName(data.getTargetDatabaseName());
-        result.setSourceDatabaseType(data.getSourceDatabaseType());
-        result.setTargetDatabaseType(data.getTargetDatabaseType());
+        result.setSourceDatabaseType(data.getSourceDatabaseType().getType());
+        result.setTargetDatabaseType(data.getTargetDatabaseType().getType());
         
result.setSources(data.getSources().entrySet().stream().collect(Collectors.toMap(Entry::getKey,
                 entry -> 
dataSourceConfigSwapper.swapToYamlConfiguration(entry.getValue()), (key, value) 
-> value, LinkedHashMap::new)));
         
result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget()));
@@ -56,7 +58,7 @@ public final class YamlMigrationJobConfigurationSwapper 
implements YamlConfigura
     @Override
     public MigrationJobConfiguration swapToObject(final 
YamlMigrationJobConfiguration yamlConfig) {
         return new MigrationJobConfiguration(yamlConfig.getJobId(), 
yamlConfig.getDatabaseName(),
-                yamlConfig.getSourceDatabaseType(), 
yamlConfig.getTargetDatabaseType(),
+                TypedSPILoader.getService(DatabaseType.class, 
yamlConfig.getSourceDatabaseType()), 
TypedSPILoader.getService(DatabaseType.class, 
yamlConfig.getTargetDatabaseType()),
                 
yamlConfig.getSources().entrySet().stream().collect(Collectors.toMap(Entry::getKey,
                         entry -> 
dataSourceConfigSwapper.swapToObject(entry.getValue()), (key, value) -> value, 
LinkedHashMap::new)),
                 dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),

Reply via email to