This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new fa4a9d61b73 Refactor and make RuleAlteredJobConfiguration immutable 
(#17682)
fa4a9d61b73 is described below

commit fa4a9d61b730c9b242a2f3c6d4d8ebcb40f443bc
Author: Hongsheng Zhong <sand...@126.com>
AuthorDate: Mon May 16 11:11:18 2022 +0800

    Refactor and make RuleAlteredJobConfiguration immutable (#17682)
---
 ...hardingRuleAlteredJobConfigurationPreparer.java |  14 +--
 .../rulealtered/RuleAlteredJobConfiguration.java   | 115 ++++-----------------
 .../yaml/RuleAlteredJobConfigurationSwapper.java   |  76 ++++++++++++++
 .../YamlRuleAlteredJobConfiguration.java}          |  61 ++++-------
 .../PipelineDataSourceConfigurationSwapper.java    |  47 +++++++++
 .../RuleAlteredJobConfigurationPreparer.java       |   5 +-
 .../RuleAlteredJobConfigurationTest.java           |  18 +++-
 .../core/api/impl/RuleAlteredJobAPIImpl.java       |  14 +--
 .../check/consistency/DataConsistencyChecker.java  |  15 ++-
 .../pipeline/core/execute/PipelineJobExecutor.java |   9 +-
 .../data/pipeline/core/job/FinishedCheckJob.java   |   6 +-
 .../scenario/rulealtered/RuleAlteredJob.java       |   4 +-
 .../rulealtered/RuleAlteredJobContext.java         |   1 -
 .../scenario/rulealtered/RuleAlteredJobWorker.java |  15 ++-
 .../job/environment/ScalingEnvironmentManager.java |   4 +-
 .../datasource/MySQLDataSourcePreparerTest.java    |  10 +-
 .../data/pipeline/env/ITEnvironmentContext.java    |   4 +-
 .../core/util/JobConfigurationBuilder.java         |   8 +-
 .../scenario/rulealtered/RuleAlteredJobTest.java   |  74 -------------
 .../rulealtered/RuleAlteredJobWorkerTest.java      |   8 +-
 20 files changed, 238 insertions(+), 270 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index 9121aec9cf1..aa13d53a231 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -24,6 +24,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -73,14 +75,14 @@ import java.util.stream.Collectors;
 public final class ShardingRuleAlteredJobConfigurationPreparer implements 
RuleAlteredJobConfigurationPreparer {
     
     @Override
-    public void extendJobConfiguration(final RuleAlteredJobConfiguration 
jobConfig) {
-        Map<String, List<DataNode>> shouldScalingActualDataNodes = 
getShouldScalingActualDataNodes(jobConfig);
-        
jobConfig.setJobShardingDataNodes(getJobShardingDataNodes(shouldScalingActualDataNodes));
-        
jobConfig.setLogicTables(getLogicTables(shouldScalingActualDataNodes.keySet()));
-        
jobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(shouldScalingActualDataNodes));
+    public void extendJobConfiguration(final YamlRuleAlteredJobConfiguration 
yamlJobConfig) {
+        Map<String, List<DataNode>> actualDataNodes = getActualDataNodes(new 
RuleAlteredJobConfigurationSwapper().swapToObject(yamlJobConfig));
+        
yamlJobConfig.setJobShardingDataNodes(getJobShardingDataNodes(actualDataNodes));
+        yamlJobConfig.setLogicTables(getLogicTables(actualDataNodes.keySet()));
+        
yamlJobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(actualDataNodes));
     }
     
-    private static Map<String, List<DataNode>> 
getShouldScalingActualDataNodes(final RuleAlteredJobConfiguration jobConfig) {
+    private static Map<String, List<DataNode>> getActualDataNodes(final 
RuleAlteredJobConfiguration jobConfig) {
         PipelineDataSourceConfiguration sourceDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
 jobConfig.getSource().getParameter());
         ShardingSpherePipelineDataSourceConfiguration source = 
(ShardingSpherePipelineDataSourceConfiguration) sourceDataSourceConfig;
         ShardingRuleConfiguration sourceRuleConfig = 
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
index 9565b9539c8..b32411d9d19 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
@@ -17,59 +17,46 @@
 
 package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
 
-import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import lombok.AllArgsConstructor;
 import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
-import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Scaling job configuration.
+ * Rule altered job configuration.
  */
-@NoArgsConstructor
-@AllArgsConstructor
+@RequiredArgsConstructor
 @Getter
-@Setter
 @Slf4j
-// TODO share for totally new scenario
-// TODO rename to Yaml, add config class
 public final class RuleAlteredJobConfiguration implements 
PipelineJobConfiguration {
     
-    private String jobId;
+    private final String jobId;
     
-    private String databaseName;
+    private final String databaseName;
     
-    /**
-     * Map{altered rule yaml class name, re-shard needed table names}.
-     */
-    private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
+    private final Integer activeVersion;
+    
+    private final Integer newVersion;
     
-    private Integer activeVersion;
+    private final String sourceDatabaseType;
     
-    private Integer newVersion;
+    private final String targetDatabaseType;
     
-    private YamlPipelineDataSourceConfiguration source;
+    private final PipelineDataSourceConfiguration source;
     
-    private YamlPipelineDataSourceConfiguration target;
+    private final PipelineDataSourceConfiguration target;
     
-    private int concurrency = 3;
+    /**
+     * Map{altered rule yaml class name, re-shard needed table names}.
+     */
+    private final Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
     
-    private int retryTimes = 3;
+    private final String logicTables;
     
     /**
      * Collection of each logic table's first data node.
@@ -78,41 +65,13 @@ public final class RuleAlteredJobConfiguration implements 
PipelineJobConfigurati
      * then value may be: {@code 
t_order:ds_0.t_order_0|t_order_item:ds_0.t_order_item_0}.
      * </p>
      */
-    private String tablesFirstDataNodes;
+    private final String tablesFirstDataNodes;
     
-    private List<String> jobShardingDataNodes;
-    
-    private String logicTables;
-    
-    private String sourceDatabaseType;
-    
-    private String targetDatabaseType;
-    
-    /**
-     * Set source.
-     *
-     * @param source source configuration
-     */
-    public void setSource(final YamlPipelineDataSourceConfiguration source) {
-        checkParameters(source);
-        this.source = source;
-    }
+    private final List<String> jobShardingDataNodes;
     
-    /**
-     * Set target.
-     *
-     * @param target target configuration
-     */
-    public void setTarget(final YamlPipelineDataSourceConfiguration target) {
-        checkParameters(target);
-        this.target = target;
-    }
+    private final int concurrency;
     
-    private void checkParameters(final YamlPipelineDataSourceConfiguration 
yamlConfig) {
-        Preconditions.checkNotNull(yamlConfig);
-        Preconditions.checkNotNull(yamlConfig.getType());
-        Preconditions.checkNotNull(yamlConfig.getParameter());
-    }
+    private final int retryTimes;
     
     /**
      * Get job sharding count.
@@ -132,38 +91,6 @@ public final class RuleAlteredJobConfiguration implements 
PipelineJobConfigurati
         return Splitter.on(',').splitToList(logicTables);
     }
     
-    /**
-     * Build handle configuration.
-     */
-    public void buildHandleConfig() {
-        if (null == getJobShardingDataNodes()) {
-            
RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(this);
-        }
-        if (null == jobId) {
-            jobId = generateJobId();
-        }
-        if (Strings.isNullOrEmpty(getSourceDatabaseType())) {
-            PipelineDataSourceConfiguration sourceDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(source.getType(), 
source.getParameter());
-            
setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
-        }
-        if (Strings.isNullOrEmpty(getTargetDatabaseType())) {
-            PipelineDataSourceConfiguration targetDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(target.getType(), 
target.getParameter());
-            
setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getType());
-        }
-    }
-    
-    private String generateJobId() {
-        RuleAlteredJobId jobId = new RuleAlteredJobId();
-        // TODO type, subTypes
-        jobId.setType(JobType.RULE_ALTERED.getValue());
-        jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
-        
jobId.setSubTypes(Collections.singletonList(JobSubType.SCALING.getValue()));
-        jobId.setCurrentMetadataVersion(activeVersion);
-        jobId.setNewMetadataVersion(newVersion);
-        jobId.setDatabaseName(databaseName);
-        return jobId.marshal();
-    }
-    
     @Override
     public String toString() {
         return "RuleAlteredJobConfiguration{"
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
new file mode 100644
index 00000000000..df11b1661e9
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml;
+
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.PipelineDataSourceConfigurationSwapper;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+
+/**
+ * Rule altered job configuration swapper.
+ */
+// TODO add RuleAlteredJobConfigurationSwapper test
+public final class RuleAlteredJobConfigurationSwapper implements 
YamlConfigurationSwapper<YamlRuleAlteredJobConfiguration, 
RuleAlteredJobConfiguration> {
+    
+    private static final RuleAlteredJobConfigurationSwapper JOB_CONFIG_SWAPPER 
= new RuleAlteredJobConfigurationSwapper();
+    
+    private final PipelineDataSourceConfigurationSwapper 
dataSourceConfigSwapper = new PipelineDataSourceConfigurationSwapper();
+    
+    @Override
+    public YamlRuleAlteredJobConfiguration swapToYamlConfiguration(final 
RuleAlteredJobConfiguration data) {
+        YamlRuleAlteredJobConfiguration result = new 
YamlRuleAlteredJobConfiguration();
+        result.setJobId(data.getJobId());
+        result.setDatabaseName(data.getDatabaseName());
+        result.setActiveVersion(data.getActiveVersion());
+        result.setNewVersion(data.getNewVersion());
+        result.setSourceDatabaseType(data.getSourceDatabaseType());
+        result.setTargetDatabaseType(data.getTargetDatabaseType());
+        
result.setSource(dataSourceConfigSwapper.swapToYamlConfiguration(data.getSource()));
+        
result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget()));
+        
result.setAlteredRuleYamlClassNameTablesMap(data.getAlteredRuleYamlClassNameTablesMap());
+        result.setLogicTables(data.getLogicTables());
+        result.setTablesFirstDataNodes(data.getTablesFirstDataNodes());
+        result.setJobShardingDataNodes(data.getJobShardingDataNodes());
+        result.setConcurrency(data.getConcurrency());
+        result.setRetryTimes(data.getRetryTimes());
+        return result;
+    }
+    
+    @Override
+    public RuleAlteredJobConfiguration swapToObject(final 
YamlRuleAlteredJobConfiguration yamlConfig) {
+        return new RuleAlteredJobConfiguration(yamlConfig.getJobId(), 
yamlConfig.getDatabaseName(),
+                yamlConfig.getActiveVersion(), yamlConfig.getNewVersion(),
+                yamlConfig.getSourceDatabaseType(), 
yamlConfig.getTargetDatabaseType(),
+                dataSourceConfigSwapper.swapToObject(yamlConfig.getSource()), 
dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),
+                yamlConfig.getAlteredRuleYamlClassNameTablesMap(), 
yamlConfig.getLogicTables(),
+                yamlConfig.getTablesFirstDataNodes(), 
yamlConfig.getJobShardingDataNodes(),
+                yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
+    }
+    
+    /**
+     * Swap to job configuration from text.
+     *
+     * @param jobParameter job parameter
+     * @return job configuration
+     */
+    public static RuleAlteredJobConfiguration swapToObject(final String 
jobParameter) {
+        YamlRuleAlteredJobConfiguration yamlJobConfig = 
YamlEngine.unmarshal(jobParameter, YamlRuleAlteredJobConfiguration.class, true);
+        return JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
similarity index 84%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
index 9565b9539c8..a7b0cd9c7aa 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
@@ -15,17 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
-import lombok.AllArgsConstructor;
 import lombok.Getter;
-import lombok.NoArgsConstructor;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
@@ -33,43 +29,42 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlConfiguration;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Scaling job configuration.
+ * Rule altered job configuration for YAML.
  */
-@NoArgsConstructor
-@AllArgsConstructor
 @Getter
 @Setter
 @Slf4j
-// TODO share for totally new scenario
-// TODO rename to Yaml, add config class
-public final class RuleAlteredJobConfiguration implements 
PipelineJobConfiguration {
+public final class YamlRuleAlteredJobConfiguration implements 
YamlConfiguration {
     
     private String jobId;
     
     private String databaseName;
     
-    /**
-     * Map{altered rule yaml class name, re-shard needed table names}.
-     */
-    private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
-    
     private Integer activeVersion;
     
     private Integer newVersion;
     
+    private String sourceDatabaseType;
+    
+    private String targetDatabaseType;
+    
     private YamlPipelineDataSourceConfiguration source;
     
     private YamlPipelineDataSourceConfiguration target;
     
-    private int concurrency = 3;
+    /**
+     * Map{altered rule yaml class name, re-shard needed table names}.
+     */
+    private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
     
-    private int retryTimes = 3;
+    private String logicTables;
     
     /**
      * Collection of each logic table's first data node.
@@ -82,11 +77,9 @@ public final class RuleAlteredJobConfiguration implements 
PipelineJobConfigurati
     
     private List<String> jobShardingDataNodes;
     
-    private String logicTables;
-    
-    private String sourceDatabaseType;
+    private int concurrency = 3;
     
-    private String targetDatabaseType;
+    private int retryTimes = 3;
     
     /**
      * Set source.
@@ -115,27 +108,9 @@ public final class RuleAlteredJobConfiguration implements 
PipelineJobConfigurati
     }
     
     /**
-     * Get job sharding count.
-     *
-     * @return job sharding count
-     */
-    public int getJobShardingCount() {
-        return null == jobShardingDataNodes ? 0 : jobShardingDataNodes.size();
-    }
-    
-    /**
-     * Split {@linkplain #logicTables} to logic table names.
-     *
-     * @return logic table names
-     */
-    public List<String> splitLogicTableNames() {
-        return Splitter.on(',').splitToList(logicTables);
-    }
-    
-    /**
-     * Build handle configuration.
+     * Extend configuration.
      */
-    public void buildHandleConfig() {
+    public void extendConfiguration() {
         if (null == getJobShardingDataNodes()) {
             
RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(this);
         }
@@ -166,7 +141,7 @@ public final class RuleAlteredJobConfiguration implements 
PipelineJobConfigurati
     
     @Override
     public String toString() {
-        return "RuleAlteredJobConfiguration{"
+        return "YamlRuleAlteredJobConfiguration{"
                 + "jobId='" + jobId + '\'' + ", databaseName='" + databaseName 
+ '\''
                 + ", activeVersion=" + activeVersion + ", newVersion=" + 
newVersion
                 + ", sourceDatabaseType='" + sourceDatabaseType + '\'' + ", 
targetDatabaseType='" + targetDatabaseType + '\''
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/yaml/PipelineDataSourceConfigurationSwapper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/yaml/PipelineDataSourceConfigurationSwapper.java
new file mode 100644
index 00000000000..b80203f0172
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/yaml/PipelineDataSourceConfigurationSwapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml;
+
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
+
+/**
+ * Pipeline data source configuration YAML swapper.
+ */
+public final class PipelineDataSourceConfigurationSwapper implements 
YamlConfigurationSwapper<YamlPipelineDataSourceConfiguration, 
PipelineDataSourceConfiguration> {
+    
+    @Override
+    public YamlPipelineDataSourceConfiguration swapToYamlConfiguration(final 
PipelineDataSourceConfiguration data) {
+        if (null == data) {
+            return null;
+        }
+        YamlPipelineDataSourceConfiguration result = new 
YamlPipelineDataSourceConfiguration();
+        result.setType(data.getType());
+        result.setParameter(data.getParameter());
+        return result;
+    }
+    
+    @Override
+    public PipelineDataSourceConfiguration swapToObject(final 
YamlPipelineDataSourceConfiguration yamlConfig) {
+        if (null == yamlConfig) {
+            return null;
+        }
+        return 
PipelineDataSourceConfigurationFactory.newInstance(yamlConfig.getType(), 
yamlConfig.getParameter());
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
index d189d128da5..c947e3874bd 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.spi.rulealtered;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.spi.type.required.RequiredSPI;
 
@@ -30,9 +31,9 @@ public interface RuleAlteredJobConfigurationPreparer extends 
RequiredSPI {
     /**
      * Extend job configuration.
      *
-     * @param jobConfig job configuration
+     * @param yamlJobConfig YAML job configuration
      */
-    void extendJobConfiguration(RuleAlteredJobConfiguration jobConfig);
+    void extendJobConfiguration(YamlRuleAlteredJobConfiguration yamlJobConfig);
     
     /**
      * Create task configuration, used by underlying scheduler.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
index 22a227032a2..2d783b94831 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
 
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -26,22 +28,28 @@ import static org.junit.Assert.assertThat;
 
 public final class RuleAlteredJobConfigurationTest {
     
+    private static final RuleAlteredJobConfigurationSwapper JOB_CONFIG_SWAPPER 
= new RuleAlteredJobConfigurationSwapper();
+    
     @Test
     public void assertGetJobShardingCountByNull() {
-        assertThat(new RuleAlteredJobConfiguration().getJobShardingCount(), 
is(0));
+        YamlRuleAlteredJobConfiguration yamlJobConfig = new 
YamlRuleAlteredJobConfiguration();
+        RuleAlteredJobConfiguration jobConfig = 
JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
+        assertThat(jobConfig.getJobShardingCount(), is(0));
     }
     
     @Test
     public void assertGetJobShardingCount() {
-        RuleAlteredJobConfiguration jobConfig = new 
RuleAlteredJobConfiguration();
-        jobConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2"));
+        YamlRuleAlteredJobConfiguration yamlJobConfig = new 
YamlRuleAlteredJobConfiguration();
+        yamlJobConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2"));
+        RuleAlteredJobConfiguration jobConfig = 
JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
         assertThat(jobConfig.getJobShardingCount(), is(2));
     }
     
     @Test
     public void assertSplitLogicTableNames() {
-        RuleAlteredJobConfiguration jobConfig = new 
RuleAlteredJobConfiguration();
-        jobConfig.setLogicTables("foo_tbl,bar_tbl");
+        YamlRuleAlteredJobConfiguration yamlJobConfig = new 
YamlRuleAlteredJobConfiguration();
+        yamlJobConfig.setLogicTables("foo_tbl,bar_tbl");
+        RuleAlteredJobConfiguration jobConfig = 
JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
         assertThat(jobConfig.splitLogicTableNames(), 
is(Arrays.asList("foo_tbl", "bar_tbl")));
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 8a5fd5cb12b..56426ee3ab1 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
@@ -102,8 +103,7 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
     
     @Override
     public Optional<String> start(final RuleAlteredJobConfiguration jobConfig) 
{
-        jobConfig.buildHandleConfig();
-        if (jobConfig.getJobShardingCount() == 0) {
+        if (0 == jobConfig.getJobShardingCount()) {
             log.warn("Invalid scaling job config!");
             throw new PipelineJobCreationException("handleConfig 
shardingTotalCount is 0");
         }
@@ -116,15 +116,15 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
             return Optional.of(jobId);
         }
         repositoryAPI.persist(String.format("%s/%s", 
DataPipelineConstants.DATA_PIPELINE_ROOT, jobId), 
RuleAlteredJob.class.getName());
-        repositoryAPI.persist(jobConfigKey, createJobConfig(jobConfig));
+        repositoryAPI.persist(jobConfigKey, createJobConfigText(jobConfig));
         return Optional.of(jobId);
     }
     
-    private String createJobConfig(final RuleAlteredJobConfiguration 
jobConfig) {
+    private String createJobConfigText(final RuleAlteredJobConfiguration 
jobConfig) {
         JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
         jobConfigPOJO.setJobName(jobConfig.getJobId());
         jobConfigPOJO.setShardingTotalCount(jobConfig.getJobShardingCount());
-        jobConfigPOJO.setJobParameter(YamlEngine.marshal(jobConfig));
+        jobConfigPOJO.setJobParameter(YamlEngine.marshal(new 
RuleAlteredJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
         jobConfigPOJO.getProps().setProperty("create_time", 
LocalDateTime.now().format(DATE_TIME_FORMATTER));
         return YamlEngine.marshal(jobConfigPOJO);
     }
@@ -365,7 +365,7 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
         return getJobConfig(getElasticJobConfigPOJO(jobId));
     }
     
-    private RuleAlteredJobConfiguration getJobConfig(final 
JobConfigurationPOJO elasticJobConfigPOJO) {
-        return YamlEngine.unmarshal(elasticJobConfigPOJO.getJobParameter(), 
RuleAlteredJobConfiguration.class, true);
+    private RuleAlteredJobConfiguration getJobConfig(final 
JobConfigurationPOJO jobConfigPOJO) {
+        return 
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 67c712cc391..eda9c1f476c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -27,7 +27,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAltere
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
@@ -155,8 +154,10 @@ public final class DataConsistencyChecker {
     }
     
     private Map<String, DataConsistencyContentCheckResult> checkData(final 
DataConsistencyCalculateAlgorithm calculator) {
-        PipelineDataSourceConfiguration sourceDataSourceConfig = 
getPipelineDataSourceConfiguration(calculator, jobConfig.getSource());
-        PipelineDataSourceConfiguration targetDataSourceConfig = 
getPipelineDataSourceConfiguration(calculator, jobConfig.getTarget());
+        decoratePipelineDataSourceConfiguration(calculator, 
jobConfig.getSource());
+        PipelineDataSourceConfiguration sourceDataSourceConfig = 
jobConfig.getSource();
+        decoratePipelineDataSourceConfiguration(calculator, 
jobConfig.getTarget());
+        PipelineDataSourceConfiguration targetDataSourceConfig = 
jobConfig.getTarget();
         ThreadFactory threadFactory = 
ExecutorThreadFactoryBuilder.build("job-" + 
getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         JobRateLimitAlgorithm inputRateLimitAlgorithm = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getInputRateLimitAlgorithm();
@@ -202,11 +203,9 @@ public final class DataConsistencyChecker {
         return result;
     }
     
-    private PipelineDataSourceConfiguration 
getPipelineDataSourceConfiguration(final DataConsistencyCalculateAlgorithm 
calculator, final YamlPipelineDataSourceConfiguration dataSourceConfig) {
-        PipelineDataSourceConfiguration result = 
PipelineDataSourceConfigurationFactory.newInstance(dataSourceConfig.getType(), 
dataSourceConfig.getParameter());
-        checkDatabaseTypeSupported(calculator.getSupportedDatabaseTypes(), 
result.getDatabaseType().getType());
-        addMySQLDataSourceConfig(result);
-        return result;
+    private void decoratePipelineDataSourceConfiguration(final 
DataConsistencyCalculateAlgorithm calculator, final 
PipelineDataSourceConfiguration dataSourceConfig) {
+        checkDatabaseTypeSupported(calculator.getSupportedDatabaseTypes(), 
dataSourceConfig.getDatabaseType().getType());
+        addMySQLDataSourceConfig(dataSourceConfig);
     }
     
     private void checkDatabaseTypeSupported(final Collection<String> 
supportedDatabaseTypes, final String databaseType) {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index a3d988f01a8..51321db57f5 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -18,9 +18,10 @@
 package org.apache.shardingsphere.data.pipeline.core.execute;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
@@ -47,6 +48,8 @@ public final class PipelineJobExecutor extends 
AbstractLifecycleExecutor {
     
     private static final Pattern CONFIG_PATTERN = 
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/(\\d{2}[0-9a-f]+)/config");
     
+    private static final RuleAlteredJobConfigurationSwapper JOB_CONFIG_SWAPPER 
= new RuleAlteredJobConfigurationSwapper();
+    
     @Override
     protected void doStart() {
         watchGovernanceRepositoryConfiguration();
@@ -66,7 +69,7 @@ public final class PipelineJobExecutor extends 
AbstractLifecycleExecutor {
                 log.info("jobId={}, deleted={}, disabled={}", 
jobConfigPOJO.getJobName(), deleted, disabled);
                 RuleAlteredJobSchedulerCenter.stop(jobConfigPOJO.getJobName());
                 // TODO refactor: dispatch to different job types
-                RuleAlteredJobConfiguration jobConfig = 
YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), 
RuleAlteredJobConfiguration.class, true);
+                RuleAlteredJobConfiguration jobConfig = 
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
                 if (deleted) {
                     new RuleAlteredJobPreparer().cleanup(jobConfig);
                 } else if 
(RuleAlteredJobProgressDetector.isJobSuccessful(jobConfig.getJobShardingCount(),
 ruleAlteredJobAPI.getProgress(jobConfig).values())) {
@@ -80,7 +83,7 @@ public final class PipelineJobExecutor extends 
AbstractLifecycleExecutor {
             switch (event.getType()) {
                 case ADDED:
                 case UPDATED:
-                    RuleAlteredJobConfiguration jobConfig = 
YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), 
RuleAlteredJobConfiguration.class, true);
+                    RuleAlteredJobConfiguration jobConfig = 
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
                     String databaseName = jobConfig.getDatabaseName();
                     if (PipelineSimpleLock.getInstance().tryLock(databaseName, 
1000)) {
                         execute(jobConfigPOJO);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 870a0339573..724c70fca0a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -19,9 +19,10 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 
 import io.vertx.core.impl.ConcurrentHashSet;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
@@ -32,7 +33,6 @@ import 
org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLock;
 import org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLock;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 
 import java.util.List;
 import java.util.Map;
@@ -64,7 +64,7 @@ public final class FinishedCheckJob implements SimpleJob {
             onCheckJobIds.add(jobId);
             try {
                 // TODO refactor: dispatch to different job types
-                RuleAlteredJobConfiguration jobConfig = 
YamlEngine.unmarshal(jobInfo.getJobParameter(), 
RuleAlteredJobConfiguration.class, true);
+                RuleAlteredJobConfiguration jobConfig = 
RuleAlteredJobConfigurationSwapper.swapToObject(jobInfo.getJobParameter());
                 RuleAlteredContext ruleAlteredContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
                 if (null == ruleAlteredContext.getCompletionDetectAlgorithm()) 
{
                     log.info("completionDetector not configured, auto switch 
will not be enabled. You could query job progress and switch config manually 
with DistSQL.");
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index bb368119b3f..cbfa0940ef2 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -19,13 +19,13 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseDatabaseLevelLockEvent;
 
 /**
@@ -42,7 +42,7 @@ public final class RuleAlteredJob implements SimpleJob {
     @Override
     public void execute(final ShardingContext shardingContext) {
         log.info("Execute job {}-{}", shardingContext.getJobName(), 
shardingContext.getShardingItem());
-        RuleAlteredJobConfiguration jobConfig = 
YamlEngine.unmarshal(shardingContext.getJobParameter(), 
RuleAlteredJobConfiguration.class, true);
+        RuleAlteredJobConfiguration jobConfig = 
RuleAlteredJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
         RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem());
         
jobContext.setInitProgress(governanceRepositoryAPI.getJobProgress(jobContext.getJobId(),
 jobContext.getShardingItem()));
         jobContext.setJobPreparer(jobPreparer);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 6f1550d7bab..35059e626c7 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -86,7 +86,6 @@ public final class RuleAlteredJobContext {
     public RuleAlteredJobContext(final RuleAlteredJobConfiguration jobConfig, 
final int jobShardingItem) {
         ruleAlteredContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
         this.jobConfig = jobConfig;
-        jobConfig.buildHandleConfig();
         jobId = jobConfig.getJobId();
         this.shardingItem = jobShardingItem;
         taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig, 
jobShardingItem, ruleAlteredContext.getOnRuleAlteredActionConfig());
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index a9ea71faba6..694dc084a3c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -24,6 +24,8 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
@@ -204,14 +206,15 @@ public final class RuleAlteredJobWorker {
             log.error("more than 1 rule altered");
             throw new PipelineJobCreationException("more than 1 rule altered");
         }
-        RuleAlteredJobConfiguration result = new RuleAlteredJobConfiguration();
+        YamlRuleAlteredJobConfiguration result = new 
YamlRuleAlteredJobConfiguration();
         result.setDatabaseName(event.getDatabaseName());
         
result.setAlteredRuleYamlClassNameTablesMap(alteredRuleYamlClassNameTablesMap);
         result.setActiveVersion(event.getActiveVersion());
         result.setNewVersion(event.getNewVersion());
         
result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig));
         
result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig));
-        return Optional.of(result);
+        result.extendConfiguration();
+        return Optional.of(new 
RuleAlteredJobConfigurationSwapper().swapToObject(result));
     }
     
     private Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> 
groupSourceTargetRuleConfigsByType(final Collection<YamlRuleConfiguration> 
sourceRules,
@@ -289,8 +292,8 @@ public final class RuleAlteredJobWorker {
                     .allMatch(progress -> null != progress && 
progress.getStatus().equals(JobStatus.FINISHED))) {
                 continue;
             }
-            RuleAlteredJobConfiguration jobConfig = 
YamlEngine.unmarshal(each.getJobParameter(), RuleAlteredJobConfiguration.class, 
true);
-            if (hasUncompletedJobOfSameDatabaseName(jobConfig, databaseName)) {
+            RuleAlteredJobConfiguration jobConfig = 
RuleAlteredJobConfigurationSwapper.swapToObject(each.getJobParameter());
+            if (databaseName.equals(jobConfig.getDatabaseName())) {
                 result = true;
                 break;
             }
@@ -298,10 +301,6 @@ public final class RuleAlteredJobWorker {
         return result;
     }
     
-    private boolean hasUncompletedJobOfSameDatabaseName(final 
RuleAlteredJobConfiguration jobConfig, final String currentDatabaseName) {
-        return currentDatabaseName.equals(jobConfig.getDatabaseName());
-    }
-    
     /**
      * scaling release database level lock.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index 61c31562e35..1e963116474 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -21,8 +21,8 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
@@ -50,7 +50,7 @@ public final class ScalingEnvironmentManager {
     public void cleanupTargetTables(final RuleAlteredJobConfiguration 
jobConfig) throws SQLException {
         Collection<String> tables = jobConfig.splitLogicTableNames();
         log.info("cleanupTargetTables, tables={}", tables);
-        YamlPipelineDataSourceConfiguration target = jobConfig.getTarget();
+        PipelineDataSourceConfiguration target = jobConfig.getTarget();
         PipelineSQLBuilder pipelineSQLBuilder = 
PipelineSQLBuilderFactory.getInstance(jobConfig.getTargetDatabaseType());
         ShardingSphereMetaData metaData = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData(jobConfig.getDatabaseName());
         TableNameSchemaNameMapping tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(metaData.getSchemas()));
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 96db014402d..140528e89bd 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -20,9 +20,9 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
@@ -56,10 +56,10 @@ public final class MySQLDataSourcePreparerTest {
     private RuleAlteredJobConfiguration jobConfig;
     
     @Mock
-    private YamlPipelineDataSourceConfiguration 
sourceYamlPipelineDataSourceConfig;
+    private PipelineDataSourceConfiguration sourcePipelineDataSourceConfig;
     
     @Mock
-    private YamlPipelineDataSourceConfiguration 
targetYamlPipelineDataSourceConfig;
+    private PipelineDataSourceConfiguration targetPipelineDataSourceConfig;
     
     @Mock
     private ShardingSpherePipelineDataSourceConfiguration 
sourceScalingDataSourceConfig;
@@ -79,10 +79,10 @@ public final class MySQLDataSourcePreparerTest {
         
when(mockPipelineDataSourceManager.getDataSource(same(sourceScalingDataSourceConfig))).thenReturn(sourceDataSourceWrapper);
         
when(mockPipelineDataSourceManager.getDataSource(same(targetScalingDataSourceConfig))).thenReturn(targetDataSourceWrapper);
         
when(prepareTargetTablesParameter.getDataSourceManager()).thenReturn(mockPipelineDataSourceManager);
-        
when(jobConfig.getSource()).thenReturn(sourceYamlPipelineDataSourceConfig);
+        when(jobConfig.getSource()).thenReturn(sourcePipelineDataSourceConfig);
         when(jobConfig.getSource().getType()).thenReturn("ShardingSphereJDBC");
         when(jobConfig.getSource().getParameter()).thenReturn("source");
-        
when(jobConfig.getTarget()).thenReturn(targetYamlPipelineDataSourceConfig);
+        when(jobConfig.getTarget()).thenReturn(targetPipelineDataSourceConfig);
         when(jobConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
         when(jobConfig.getTarget().getParameter()).thenReturn("target");
         
when(prepareTargetTablesParameter.getJobConfig()).thenReturn(jobConfig);
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
index 30a8a7ff919..4de500bd417 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.integration.data.pipeline.env;
 import com.google.gson.Gson;
 import lombok.Getter;
 import lombok.SneakyThrows;
-import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
@@ -81,7 +81,7 @@ public final class ITEnvironmentContext {
     }
     
     private static String createScalingConfiguration(final Map<String, 
YamlTableRuleConfiguration> tableRules) {
-        RuleAlteredJobConfiguration jobConfig = new 
RuleAlteredJobConfiguration();
+        YamlRuleAlteredJobConfiguration jobConfig = new 
YamlRuleAlteredJobConfiguration();
         
jobConfig.setSource(createYamlPipelineDataSourceConfiguration(SourceConfiguration.getDockerConfiguration(tableRules)));
         
jobConfig.setTarget(createYamlPipelineDataSourceConfiguration(TargetConfiguration.getDockerConfiguration()));
         return new Gson().toJson(jobConfig);
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 4b3fe083426..1900339730a 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.data.pipeline.core.util;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -44,7 +46,7 @@ public final class JobConfigurationBuilder {
      * @return created job configuration
      */
     public static RuleAlteredJobConfiguration createJobConfiguration() {
-        RuleAlteredJobConfiguration result = new RuleAlteredJobConfiguration();
+        YamlRuleAlteredJobConfiguration result = new 
YamlRuleAlteredJobConfiguration();
         result.setDatabaseName("logic_db");
         
result.setAlteredRuleYamlClassNameTablesMap(Collections.singletonMap(YamlShardingRuleConfiguration.class.getName(),
 Collections.singletonList("t_order")));
         result.setActiveVersion(0);
@@ -53,10 +55,10 @@ public final class JobConfigurationBuilder {
         result.setSource(createYamlPipelineDataSourceConfiguration(
                 new 
ShardingSpherePipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"))));
         result.setTarget(createYamlPipelineDataSourceConfiguration(new 
StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml"))));
-        result.buildHandleConfig();
+        result.extendConfiguration();
         int activeVersion = 
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 10) + 1;
         result.setJobId(generateJobId(activeVersion, "logic_db"));
-        return result;
+        return new RuleAlteredJobConfigurationSwapper().swapToObject(result);
     }
     
     private static YamlPipelineDataSourceConfiguration 
createYamlPipelineDataSourceConfiguration(final PipelineDataSourceConfiguration 
config) {
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
deleted file mode 100644
index 1600b0127f1..00000000000
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-
-import lombok.SneakyThrows;
-import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Map;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-
-public final class RuleAlteredJobTest {
-    
-    @BeforeClass
-    public static void beforeClass() {
-        PipelineContextUtil.mockModeConfigAndContextManager();
-    }
-    
-    @Test
-    @SuppressWarnings("unchecked")
-    @SneakyThrows(ReflectiveOperationException.class)
-    public void assertExecute() {
-        RuleAlteredJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
-        initTableData(jobConfig);
-        ShardingContext shardingContext = new ShardingContext("1", null, 2, 
YamlEngine.marshal(jobConfig), 0, null);
-        new RuleAlteredJob().execute(shardingContext);
-        Map<String, RuleAlteredJobScheduler> jobSchedulerMap = 
ReflectionUtil.getStaticFieldValue(RuleAlteredJobSchedulerCenter.class, 
"JOB_SCHEDULER_MAP", Map.class);
-        assertNotNull(jobSchedulerMap);
-        assertFalse(jobSchedulerMap.isEmpty());
-    }
-    
-    @SneakyThrows(SQLException.class)
-    private void initTableData(final RuleAlteredJobConfiguration jobConfig) {
-        YamlPipelineDataSourceConfiguration source = jobConfig.getSource();
-        try (
-                PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(source.getType(),
 source.getParameter()));
-                Connection connection = dataSource.getConnection();
-                Statement statement = connection.createStatement()) {
-            statement.execute("DROP TABLE IF EXISTS t_order");
-            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, 
user_id VARCHAR(12))");
-            statement.execute("INSERT INTO t_order (order_id, user_id) VALUES 
(1, 'xxx'), (999, 'yyy')");
-        }
-    }
-}
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index 577dee68b64..2ed00596652 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -19,6 +19,8 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import org.apache.commons.io.FileUtils;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -56,8 +58,10 @@ public final class RuleAlteredJobWorkerTest {
     @Test(expected = PipelineJobCreationException.class)
     public void assertCreateRuleAlteredContextNoAlteredRule() {
         RuleAlteredJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
-        jobConfig.setAlteredRuleYamlClassNameTablesMap(Collections.emptyMap());
-        RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+        RuleAlteredJobConfigurationSwapper swapper = new 
RuleAlteredJobConfigurationSwapper();
+        YamlRuleAlteredJobConfiguration yamlJobConfig = 
swapper.swapToYamlConfiguration(jobConfig);
+        
yamlJobConfig.setAlteredRuleYamlClassNameTablesMap(Collections.emptyMap());
+        
RuleAlteredJobWorker.createRuleAlteredContext(swapper.swapToObject(yamlJobConfig));
     }
     
     @Test

Reply via email to