This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cc2bd6e1d56 Extract  YamlJobProgress.YamlIncremental and 
YamlJobProgress.YamlInventory (#19875)
cc2bd6e1d56 is described below

commit cc2bd6e1d5671dd8bc73c698114d2d833f94a230
Author: Da Xiang Huang <localhos...@foxmail.com>
AuthorDate: Fri Aug 5 22:13:37 2022 +0800

    Extract  YamlJobProgress.YamlIncremental and YamlJobProgress.YamlInventory 
(#19875)
---
 .../job/progress/JobIncrementalTaskProgress.java   | 63 +++++++++++++++
 ...Progress.java => JobInventoryTaskProgress.java} | 65 +++-------------
 .../pipeline/api/job/progress/JobProgress.java     | 36 +++------
 .../api/task/progress/IncrementalTaskProgress.java |  6 +-
 .../core/api/impl/GovernanceRepositoryAPIImpl.java | 31 ++++----
 .../yaml/YamlIncrementalTaskProgress.java}         | 20 ++---
 .../yaml/YamlIncrementalTaskProgressSwapper.java   | 68 +++++++++++++++++
 .../progress/yaml/YamlInventoryTaskProgress.java}  | 19 ++---
 .../yaml/YamlInventoryTaskProgressSwapper.java     | 89 ++++++++++++++++++++++
 .../core/job/progress/yaml/YamlJobProgress.java    | 26 +------
 .../job/progress/yaml/YamlJobProgressSwapper.java  | 78 +------------------
 .../data/pipeline/core/task/IncrementalTask.java   |  9 ++-
 ...dleRuleAlteredJobCompletionDetectAlgorithm.java |  4 +-
 .../core/job/progress/JobProgressTest.java         |  4 +-
 .../progress/yaml/YamlJobProgressSwapperTest.java  |  8 +-
 ...uleAlteredJobCompletionDetectAlgorithmTest.java | 13 ++--
 .../prepare/InventoryTaskSplitterTest.java         | 10 ++-
 .../src/test/resources/governance-repository.yaml  | 10 +--
 .../test/resources/job-progress-all-finished.yaml  | 10 +--
 .../test/resources/job-progress-no-finished.yaml   | 12 +--
 .../test/resources/job-progress-no-inventory.yaml  | 16 ++--
 .../src/test/resources/job-progress.yaml           | 10 +--
 22 files changed, 329 insertions(+), 278 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobIncrementalTaskProgress.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobIncrementalTaskProgress.java
new file mode 100644
index 00000000000..5b6be6dd1cc
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobIncrementalTaskProgress.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.job.progress;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+
+/**
+ * Job incrementalTask progress.
+ */
+@RequiredArgsConstructor
+@Getter
+public class JobIncrementalTaskProgress {
+    
+    private final Map<String, IncrementalTaskProgress> 
incrementalTaskProgressMap;
+    
+    /**
+     * Get incremental position.
+     * 
+     * @param dataSourceName dataSource
+     * @return incremental position
+     */
+    public Optional<IngestPosition<?>> getIncrementalPosition(final String 
dataSourceName) {
+        Optional<IncrementalTaskProgress> incrementalTaskProgressItem = 
incrementalTaskProgressMap.entrySet().stream()
+                .filter(entry -> dataSourceName.equals(entry.getKey()))
+                .map(Map.Entry::getValue)
+                .findAny();
+        return 
incrementalTaskProgressItem.map(IncrementalTaskProgress::getPosition);
+    }
+    
+    /**
+     * Get incremental latest active time milliseconds.
+     *
+     * @return latest active time, <code>0</code> is there is no activity
+     */
+    public long getIncrementalLatestActiveTimeMillis() {
+        List<Long> delays = incrementalTaskProgressMap.values().stream()
+                .map(each -> 
each.getIncrementalTaskDelay().getLatestActiveTimeMillis())
+                .collect(Collectors.toList());
+        return delays.stream().reduce(Long::max).orElse(0L);
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobInventoryTaskProgress.java
similarity index 53%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobInventoryTaskProgress.java
index 2f0be9743b1..ff75c1bff5c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobInventoryTaskProgress.java
@@ -17,49 +17,23 @@
 
 package org.apache.shardingsphere.data.pipeline.api.job.progress;
 
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
 /**
- * Job progress.
+ * Job inventoryTask progress.
  */
+@RequiredArgsConstructor
 @Getter
-@Setter
-// TODO now rename
-public final class JobProgress implements PipelineJobProgress {
+public class JobInventoryTaskProgress {
     
-    private JobStatus status = JobStatus.RUNNING;
-    
-    private String sourceDatabaseType;
-    
-    private boolean active;
-    
-    private Map<String, InventoryTaskProgress> inventoryTaskProgressMap;
-    
-    private Map<String, IncrementalTaskProgress> incrementalTaskProgressMap;
-    
-    /**
-     * Get incremental position.
-     *
-     * @param dataSourceName data source name
-     * @return incremental position
-     */
-    public Optional<IngestPosition<?>> getIncrementalPosition(final String 
dataSourceName) {
-        IncrementalTaskProgress progress = 
incrementalTaskProgressMap.get(dataSourceName);
-        return Optional.ofNullable(null != progress ? progress.getPosition() : 
null);
-    }
+    private final Map<String, InventoryTaskProgress> inventoryTaskProgressMap;
     
     /**
      * Get inventory position.
@@ -71,16 +45,7 @@ public final class JobProgress implements 
PipelineJobProgress {
         Pattern pattern = Pattern.compile(String.format("%s(#\\d+)?", 
tableName));
         return inventoryTaskProgressMap.entrySet().stream()
                 .filter(entry -> pattern.matcher(entry.getKey()).find())
-                .collect(Collectors.toMap(Entry::getKey, entry -> 
entry.getValue().getPosition()));
-    }
-    
-    /**
-     * Get data source.
-     *
-     * @return data source
-     */
-    public String getDataSource() {
-        return 
incrementalTaskProgressMap.keySet().stream().findAny().orElse("");
+                .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
entry.getValue().getPosition()));
     }
     
     /**
@@ -94,16 +59,4 @@ public final class JobProgress implements 
PipelineJobProgress {
                 .count();
         return inventoryTaskProgressMap.isEmpty() ? 0 : (int) (finished * 100 
/ inventoryTaskProgressMap.size());
     }
-    
-    /**
-     * Get incremental latest active time milliseconds.
-     *
-     * @return latest active time, <code>0</code> is there is no activity
-     */
-    public long getIncrementalLatestActiveTimeMillis() {
-        List<Long> delays = incrementalTaskProgressMap.values().stream()
-                .map(each -> 
each.getIncrementalTaskDelay().getLatestActiveTimeMillis())
-                .collect(Collectors.toList());
-        return delays.stream().reduce(Long::max).orElse(0L);
-    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
index 2f0be9743b1..a902fa37884 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
@@ -19,18 +19,11 @@ package 
org.apache.shardingsphere.data.pipeline.api.job.progress;
 
 import lombok.Getter;
 import lombok.Setter;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
 /**
  * Job progress.
@@ -46,19 +39,17 @@ public final class JobProgress implements 
PipelineJobProgress {
     
     private boolean active;
     
-    private Map<String, InventoryTaskProgress> inventoryTaskProgressMap;
+    private JobInventoryTaskProgress jobInventoryTask;
     
-    private Map<String, IncrementalTaskProgress> incrementalTaskProgressMap;
+    private JobIncrementalTaskProgress jobIncrementalTask;
     
     /**
-     * Get incremental position.
-     *
-     * @param dataSourceName data source name
+     * get incremental position.
+     * @param dataSourceName dataSource
      * @return incremental position
      */
     public Optional<IngestPosition<?>> getIncrementalPosition(final String 
dataSourceName) {
-        IncrementalTaskProgress progress = 
incrementalTaskProgressMap.get(dataSourceName);
-        return Optional.ofNullable(null != progress ? progress.getPosition() : 
null);
+        return jobIncrementalTask.getIncrementalPosition(dataSourceName);
     }
     
     /**
@@ -68,10 +59,7 @@ public final class JobProgress implements 
PipelineJobProgress {
      * @return inventory position
      */
     public Map<String, IngestPosition<?>> getInventoryPosition(final String 
tableName) {
-        Pattern pattern = Pattern.compile(String.format("%s(#\\d+)?", 
tableName));
-        return inventoryTaskProgressMap.entrySet().stream()
-                .filter(entry -> pattern.matcher(entry.getKey()).find())
-                .collect(Collectors.toMap(Entry::getKey, entry -> 
entry.getValue().getPosition()));
+        return jobInventoryTask.getInventoryPosition(tableName);
     }
     
     /**
@@ -80,7 +68,7 @@ public final class JobProgress implements PipelineJobProgress 
{
      * @return data source
      */
     public String getDataSource() {
-        return 
incrementalTaskProgressMap.keySet().stream().findAny().orElse("");
+        return 
jobIncrementalTask.getIncrementalTaskProgressMap().keySet().stream().findAny().orElse("");
     }
     
     /**
@@ -89,10 +77,7 @@ public final class JobProgress implements 
PipelineJobProgress {
      * @return finished percentage
      */
     public int getInventoryFinishedPercentage() {
-        long finished = inventoryTaskProgressMap.values().stream()
-                .filter(each -> each.getPosition() instanceof FinishedPosition)
-                .count();
-        return inventoryTaskProgressMap.isEmpty() ? 0 : (int) (finished * 100 
/ inventoryTaskProgressMap.size());
+        return jobInventoryTask.getInventoryFinishedPercentage();
     }
     
     /**
@@ -101,9 +86,6 @@ public final class JobProgress implements 
PipelineJobProgress {
      * @return latest active time, <code>0</code> is there is no activity
      */
     public long getIncrementalLatestActiveTimeMillis() {
-        List<Long> delays = incrementalTaskProgressMap.values().stream()
-                .map(each -> 
each.getIncrementalTaskDelay().getLatestActiveTimeMillis())
-                .collect(Collectors.toList());
-        return delays.stream().reduce(Long::max).orElse(0L);
+        return null == jobIncrementalTask ? 0L : 
jobIncrementalTask.getIncrementalLatestActiveTimeMillis();
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
index d8eb199e8e0..938065f6e6c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
@@ -17,9 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.api.task.progress;
 
-import lombok.AllArgsConstructor;
 import lombok.Getter;
-import lombok.NoArgsConstructor;
+import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
@@ -27,8 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
 /**
  * Incremental task progress.
  */
-@NoArgsConstructor
-@AllArgsConstructor
+@RequiredArgsConstructor
 @Getter
 @Setter
 @ToString
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index d9f035831a6..06c5a61a25a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -21,9 +21,9 @@ import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobIncrementalTaskProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobInventoryTaskProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgress;
@@ -35,9 +35,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -62,26 +60,23 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
         JobProgress jobProgress = new JobProgress();
         jobProgress.setStatus(jobContext.getStatus());
         
jobProgress.setSourceDatabaseType(jobContext.getJobConfig().getSourceDatabaseType());
-        
jobProgress.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
-        
jobProgress.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
+        
jobProgress.setJobIncrementalTask(getIncrementalTaskProgress(jobContext));
+        jobProgress.setJobInventoryTask(getInventoryTaskProgress(jobContext));
         String value = YamlEngine.marshal(SWAPPER.swapToYaml(jobProgress));
         
repository.persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobContext.getJobId(),
 jobContext.getShardingItem()), value);
     }
     
-    private Map<String, IncrementalTaskProgress> 
getIncrementalTaskProgressMap(final RuleAlteredJobContext jobContext) {
-        Map<String, IncrementalTaskProgress> result = new 
HashMap<>(jobContext.getIncrementalTasks().size(), 1);
-        for (IncrementalTask each : jobContext.getIncrementalTasks()) {
-            result.put(each.getTaskId(), each.getProgress());
-        }
-        return result;
+    private JobIncrementalTaskProgress getIncrementalTaskProgress(final 
RuleAlteredJobContext jobContext) {
+        return new JobIncrementalTaskProgress(
+                jobContext.getIncrementalTasks()
+                        
.stream().collect(Collectors.toMap(IncrementalTask::getTaskId, 
IncrementalTask::getProgress)));
     }
     
-    private Map<String, InventoryTaskProgress> 
getInventoryTaskProgressMap(final RuleAlteredJobContext jobContext) {
-        Map<String, InventoryTaskProgress> result = new 
HashMap<>(jobContext.getInventoryTasks().size(), 1);
-        for (InventoryTask each : jobContext.getInventoryTasks()) {
-            result.put(each.getTaskId(), each.getProgress());
-        }
-        return result;
+    private JobInventoryTaskProgress getInventoryTaskProgress(final 
RuleAlteredJobContext jobContext) {
+        return new JobInventoryTaskProgress(
+                jobContext.getInventoryTasks()
+                        .stream()
+                        .collect(Collectors.toMap(InventoryTask::getTaskId, 
InventoryTask::getProgress)));
     }
     
     @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlIncrementalTaskProgress.java
similarity index 62%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlIncrementalTaskProgress.java
index d8eb199e8e0..431b69c4e08 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlIncrementalTaskProgress.java
@@ -15,26 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.task.progress;
+package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
 
-import lombok.AllArgsConstructor;
 import lombok.Getter;
-import lombok.NoArgsConstructor;
 import lombok.Setter;
-import lombok.ToString;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskDelay;
 
 /**
- * Incremental task progress.
+ * Yaml IncrementalTaskProgress.
  */
-@NoArgsConstructor
-@AllArgsConstructor
 @Getter
 @Setter
-@ToString
-public final class IncrementalTaskProgress implements TaskProgress {
+public final class YamlIncrementalTaskProgress {
     
-    private volatile IngestPosition<?> position;
+    private String dataSourceName;
     
-    private IncrementalTaskDelay incrementalTaskDelay = new 
IncrementalTaskDelay();
+    private String position;
+    
+    private IncrementalTaskDelay delay;
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlIncrementalTaskProgressSwapper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlIncrementalTaskProgressSwapper.java
new file mode 100644
index 00000000000..8c7376fda7d
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlIncrementalTaskProgressSwapper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
+
+import java.util.Collections;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobIncrementalTaskProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
+
+/**
+ * YAML IncrementalTask progress swapper.
+ */
+public final class YamlIncrementalTaskProgressSwapper {
+    
+    /**
+     * Swap to YAML.
+     *
+     * @param jobIncrementalTask incrementalTask progress
+     * @return IncrementalTaskProgress
+     */
+    public YamlIncrementalTaskProgress swapToYaml(final 
JobIncrementalTaskProgress jobIncrementalTask) {
+        if (null == jobIncrementalTask) {
+            return null;
+        }
+        return jobIncrementalTask.getIncrementalTaskProgressMap()
+                .entrySet()
+                .stream()
+                .map(entry -> {
+                    YamlIncrementalTaskProgress yamlIncrementalTaskProgress = 
new YamlIncrementalTaskProgress();
+                    
yamlIncrementalTaskProgress.setDataSourceName(entry.getKey());
+                    
yamlIncrementalTaskProgress.setPosition(entry.getValue().getPosition().toString());
+                    
yamlIncrementalTaskProgress.setDelay(entry.getValue().getIncrementalTaskDelay());
+                    return yamlIncrementalTaskProgress;
+                }).findAny().orElse(null);
+    }
+    
+    /**
+     * Swap to object.
+     *
+     * @param databaseType databaseType
+     * @param incremental yaml incrementalTask progress
+     * @return incrementalTask progress
+     */
+    public JobIncrementalTaskProgress swapToObject(final String databaseType, 
final YamlIncrementalTaskProgress incremental) {
+        if (null == incremental) {
+            return null;
+        }
+        IncrementalTaskProgress result = new IncrementalTaskProgress();
+        
result.setPosition(PositionInitializerFactory.getInstance(databaseType).init(incremental.getPosition()));
+        result.setIncrementalTaskDelay(incremental.getDelay());
+        return new 
JobIncrementalTaskProgress(Collections.singletonMap(incremental.getDataSourceName(),
 result));
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryTaskProgress.java
similarity index 62%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryTaskProgress.java
index d8eb199e8e0..9aefc9ab639 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryTaskProgress.java
@@ -15,26 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.task.progress;
+package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
 
-import lombok.AllArgsConstructor;
+import java.util.HashMap;
+import java.util.Map;
 import lombok.Getter;
-import lombok.NoArgsConstructor;
 import lombok.Setter;
-import lombok.ToString;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 
 /**
- * Incremental task progress.
+ * Yaml InventoryTaskProgress.
  */
-@NoArgsConstructor
-@AllArgsConstructor
 @Getter
 @Setter
-@ToString
-public final class IncrementalTaskProgress implements TaskProgress {
+public final class YamlInventoryTaskProgress {
     
-    private volatile IngestPosition<?> position;
+    private String[] finished = new String[0];
     
-    private IncrementalTaskDelay incrementalTaskDelay = new 
IncrementalTaskDelay();
+    private Map<String, String> unfinished = new HashMap<>();
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryTaskProgressSwapper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryTaskProgressSwapper.java
new file mode 100644
index 00000000000..d5578437f40
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryTaskProgressSwapper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
+
+import com.google.common.base.Strings;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPositionFactory;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobInventoryTaskProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
+
+/**
+ * YAML InventoryTask progress swapper.
+ */
+public final class YamlInventoryTaskProgressSwapper {
+    
+    /**
+     * Swap to YAML.
+     *
+     * @param inventory inventoryTask progress
+     * @return YAML inventoryTask progress
+     */
+    public YamlInventoryTaskProgress swapToYaml(final JobInventoryTaskProgress 
inventory) {
+        YamlInventoryTaskProgress result = new YamlInventoryTaskProgress();
+        if (inventory != null) {
+            result.setFinished(getFinished(inventory));
+            result.setUnfinished(getUnfinished(inventory));
+        }
+        return result;
+    }
+    
+    private String[] getFinished(final JobInventoryTaskProgress 
jobInventoryTaskProgress) {
+        return 
jobInventoryTaskProgress.getInventoryTaskProgressMap().entrySet().stream()
+                .filter(entry -> entry.getValue().getPosition() instanceof 
FinishedPosition)
+                .map(Map.Entry::getKey)
+                .toArray(String[]::new);
+    }
+    
+    private Map<String, String> getUnfinished(final JobInventoryTaskProgress 
jobInventoryTaskProgress) {
+        return 
jobInventoryTaskProgress.getInventoryTaskProgressMap().entrySet().stream()
+                .filter(entry -> !(entry.getValue().getPosition() instanceof 
FinishedPosition))
+                .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
entry.getValue().getPosition().toString()));
+    }
+    
+    /**
+     * Swap to object.
+     *
+     * @param inventory yaml inventoryTask progress
+     * @return inventoryTask progress
+     */
+    public JobInventoryTaskProgress swapToObject(final 
YamlInventoryTaskProgress inventory) {
+        if (null == inventory) {
+            return null;
+        }
+        Map<String, InventoryTaskProgress> inventoryTaskProgressItemMap = new 
HashMap<>();
+        
inventoryTaskProgressItemMap.putAll(Arrays.stream(inventory.getFinished())
+                .collect(Collectors.toMap(key -> key, value -> new 
InventoryTaskProgress(new FinishedPosition()))));
+        
inventoryTaskProgressItemMap.putAll(inventory.getUnfinished().entrySet()
+                .stream()
+                .collect(Collectors.toMap(
+                        Map.Entry::getKey, 
getInventoryTaskProgressFunction())));
+        return new JobInventoryTaskProgress(inventoryTaskProgressItemMap);
+    }
+    
+    private Function<Map.Entry<String, String>, InventoryTaskProgress> 
getInventoryTaskProgressFunction() {
+        return entry -> new InventoryTaskProgress(
+                Strings.isNullOrEmpty(entry.getValue()) ? new 
PlaceholderPosition() : 
PrimaryKeyPositionFactory.newInstance(entry.getValue()));
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgress.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgress.java
index 259aab86eb5..6f060f6ba1d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgress.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgress.java
@@ -19,10 +19,6 @@ package 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
 
 import lombok.Getter;
 import lombok.Setter;
-import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskDelay;
-
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * Yaml job progress.
@@ -35,25 +31,7 @@ public final class YamlJobProgress {
     
     private String sourceDatabaseType;
     
-    private YamlInventory inventory;
-    
-    private Map<String, YamlIncremental> incremental;
-    
-    @Getter
-    @Setter
-    public static final class YamlInventory {
-        
-        private String[] finished = new String[0];
-        
-        private Map<String, String> unfinished = new HashMap<>();
-    }
+    private YamlInventoryTaskProgress inventory;
     
-    @Getter
-    @Setter
-    public static final class YamlIncremental {
-        
-        private String position;
-        
-        private IncrementalTaskDelay delay;
-    }
+    private YamlIncrementalTaskProgress incremental;
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapper.java
index b89557e7cda..6b0387ab6d3 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapper.java
@@ -17,23 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
 
-import com.google.common.base.Strings;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPositionFactory;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /**
  * YAML Job progress swapper.
@@ -50,41 +35,11 @@ public final class YamlJobProgressSwapper {
         YamlJobProgress result = new YamlJobProgress();
         result.setStatus(jobProgress.getStatus().name());
         result.setSourceDatabaseType(jobProgress.getSourceDatabaseType());
-        
result.setInventory(getYamlInventory(jobProgress.getInventoryTaskProgressMap()));
-        
result.setIncremental(getYamlIncremental(jobProgress.getIncrementalTaskProgressMap()));
+        result.setInventory(new 
YamlInventoryTaskProgressSwapper().swapToYaml(jobProgress.getJobInventoryTask()));
+        result.setIncremental(new 
YamlIncrementalTaskProgressSwapper().swapToYaml(jobProgress.getJobIncrementalTask()));
         return result;
     }
     
-    private YamlJobProgress.YamlInventory getYamlInventory(final Map<String, 
InventoryTaskProgress> inventoryTaskProgressMap) {
-        YamlJobProgress.YamlInventory result = new 
YamlJobProgress.YamlInventory();
-        result.setFinished(getFinished(inventoryTaskProgressMap));
-        result.setUnfinished(getUnfinished(inventoryTaskProgressMap));
-        return result;
-    }
-    
-    private String[] getFinished(final Map<String, InventoryTaskProgress> 
inventoryTaskProgressMap) {
-        return inventoryTaskProgressMap.entrySet().stream()
-                .filter(entry -> entry.getValue().getPosition() instanceof 
FinishedPosition)
-                .map(Entry::getKey)
-                .toArray(String[]::new);
-    }
-    
-    private Map<String, String> getUnfinished(final Map<String, 
InventoryTaskProgress> inventoryTaskProgressMap) {
-        return inventoryTaskProgressMap.entrySet().stream()
-                .filter(entry -> !(entry.getValue().getPosition() instanceof 
FinishedPosition))
-                .collect(Collectors.toMap(Entry::getKey, entry -> 
entry.getValue().getPosition().toString()));
-    }
-    
-    private Map<String, YamlJobProgress.YamlIncremental> 
getYamlIncremental(final Map<String, IncrementalTaskProgress> 
incrementalTaskProgressMap) {
-        return incrementalTaskProgressMap.entrySet().stream()
-                .collect(Collectors.toMap(Entry::getKey, entry -> {
-                    YamlJobProgress.YamlIncremental result = new 
YamlJobProgress.YamlIncremental();
-                    
result.setPosition(entry.getValue().getPosition().toString());
-                    
result.setDelay(entry.getValue().getIncrementalTaskDelay());
-                    return result;
-                }));
-    }
-    
     /**
      * Swap to object.
      *
@@ -95,33 +50,8 @@ public final class YamlJobProgressSwapper {
         JobProgress result = new JobProgress();
         result.setStatus(JobStatus.valueOf(yamlJobProgress.getStatus()));
         result.setSourceDatabaseType(yamlJobProgress.getSourceDatabaseType());
-        
result.setInventoryTaskProgressMap(getInventoryTaskProgressMap(yamlJobProgress.getInventory()));
-        
result.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(yamlJobProgress.getSourceDatabaseType(),
 yamlJobProgress.getIncremental()));
+        result.setJobInventoryTask(new 
YamlInventoryTaskProgressSwapper().swapToObject(yamlJobProgress.getInventory()));
+        result.setJobIncrementalTask(new 
YamlIncrementalTaskProgressSwapper().swapToObject(yamlJobProgress.getSourceDatabaseType(),
 yamlJobProgress.getIncremental()));
         return result;
     }
-    
-    private Map<String, InventoryTaskProgress> 
getInventoryTaskProgressMap(final YamlJobProgress.YamlInventory inventory) {
-        if (null == inventory) {
-            return new LinkedHashMap<>();
-        }
-        Map<String, InventoryTaskProgress> result = new HashMap<>();
-        
result.putAll(Arrays.stream(inventory.getFinished()).collect(Collectors.toMap(each
 -> each, each -> new InventoryTaskProgress(new FinishedPosition()))));
-        
result.putAll(inventory.getUnfinished().entrySet().stream().collect(Collectors.toMap(Entry::getKey,
 getInventoryTaskProgressFunction())));
-        return result;
-    }
-    
-    private Function<Entry<String, String>, InventoryTaskProgress> 
getInventoryTaskProgressFunction() {
-        return entry -> new 
InventoryTaskProgress(Strings.isNullOrEmpty(entry.getValue()) ? new 
PlaceholderPosition() : 
PrimaryKeyPositionFactory.newInstance(entry.getValue()));
-    }
-    
-    private Map<String, IncrementalTaskProgress> 
getIncrementalTaskProgressMap(final String databaseType, final Map<String, 
YamlJobProgress.YamlIncremental> incremental) {
-        if (null == incremental) {
-            return new LinkedHashMap<>();
-        }
-        return 
incremental.entrySet().stream().collect(Collectors.toMap(Entry::getKey, 
getIncrementalTaskProgressFunction(databaseType)));
-    }
-    
-    private Function<Entry<String, YamlJobProgress.YamlIncremental>, 
IncrementalTaskProgress> getIncrementalTaskProgressFunction(final String 
databaseType) {
-        return entry -> new 
IncrementalTaskProgress(PositionInitializerFactory.getInstance(databaseType).init(entry.getValue().getPosition()),
 entry.getValue().getDelay());
-    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 161ed5fdfbb..55a8ab607a1 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -71,14 +71,19 @@ public final class IncrementalTask extends 
AbstractLifecycleExecutor implements
                            final PipelineTableMetaDataLoader 
sourceMetaDataLoader, final ExecuteEngine incrementalDumperExecuteEngine, final 
PipelineJobPersistCallback pipelineJobPersistCallback) {
         this.incrementalDumperExecuteEngine = incrementalDumperExecuteEngine;
         taskId = dumperConfig.getDataSourceName();
-        progress = new IncrementalTaskProgress();
         IngestPosition<?> position = dumperConfig.getPosition();
-        progress.setPosition(position);
+        progress = createIncrementalTaskProgress(position);
         channel = createChannel(concurrency, pipelineChannelCreator, progress);
         dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, 
channel, sourceMetaDataLoader);
         importers = createImporters(concurrency, importerConfig, 
dataSourceManager, channel, pipelineJobPersistCallback);
     }
     
+    private IncrementalTaskProgress createIncrementalTaskProgress(final 
IngestPosition<?> position) {
+        IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress();
+        incrementalTaskProgress.setPosition(position);
+        return incrementalTaskProgress;
+    }
+    
     @Override
     protected void doStart() {
         
progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
index fe3e1043416..dea0d03c684 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
@@ -75,12 +75,12 @@ public final class 
IdleRuleAlteredJobCompletionDetectAlgorithm implements JobCom
     }
     
     private static boolean isAllInventoryTasksCompleted(final 
Collection<JobProgress> jobProgresses) {
-        return jobProgresses.stream().flatMap(each -> 
each.getInventoryTaskProgressMap().values().stream()).allMatch(each -> 
each.getPosition() instanceof FinishedPosition);
+        return jobProgresses.stream().flatMap(each -> 
each.getJobInventoryTask().getInventoryTaskProgressMap().values().stream()).allMatch(each
 -> each.getPosition() instanceof FinishedPosition);
     }
     
     private static Collection<Long> getIncrementalTasksIdleSeconds(final 
Collection<JobProgress> jobProgresses) {
         long currentTimeMillis = System.currentTimeMillis();
-        return jobProgresses.stream().flatMap(each -> 
each.getIncrementalTaskProgressMap().values().stream())
+        return jobProgresses.stream().flatMap(each -> 
each.getJobIncrementalTask().getIncrementalTaskProgressMap().values().stream())
                 .map(each -> {
                     long latestActiveTimeMillis = 
each.getIncrementalTaskDelay().getLatestActiveTimeMillis();
                     return latestActiveTimeMillis > 0 ? 
TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 0;
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
index 986c0c653cf..695258f4d28 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
@@ -45,8 +45,8 @@ public final class JobProgressTest {
         JobProgress jobProgress = 
getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml"));
         assertThat(jobProgress.getStatus(), is(JobStatus.RUNNING));
         assertThat(jobProgress.getSourceDatabaseType(), is("H2"));
-        assertThat(jobProgress.getInventoryTaskProgressMap().size(), is(4));
-        assertThat(jobProgress.getIncrementalTaskProgressMap().size(), is(1));
+        
assertThat(jobProgress.getJobInventoryTask().getInventoryTaskProgressMap().size(),
 is(4));
+        
assertThat(jobProgress.getJobIncrementalTask().getIncrementalTaskProgressMap().size(),
 is(1));
     }
     
     @Test
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
index 8914eff8557..e650552b2f6 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
@@ -26,7 +26,6 @@ import org.junit.Test;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
 
 public final class YamlJobProgressSwapperTest {
@@ -48,16 +47,15 @@ public final class YamlJobProgressSwapperTest {
         assertThat(actual.getInventory().getUnfinished().size(), is(2));
         assertThat(actual.getInventory().getUnfinished().get("ds1.t_2"), 
is("i,1,2"));
         assertThat(actual.getInventory().getUnfinished().get("ds1.t_1"), 
is(""));
-        assertThat(actual.getIncremental().size(), is(1));
-        assertTrue(actual.getIncremental().containsKey("ds0"));
-        assertNull(actual.getIncremental().get("position"));
+        assertThat(actual.getIncremental().getDataSourceName(), is("ds0"));
+        assertThat(actual.getIncremental().getPosition().length(), is(0));
     }
     
     @Test
     public void assertNullIncremental() {
         JobProgress jobProgress = 
getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-finished.yaml"));
         YamlJobProgress actual = SWAPPER.swapToYaml(jobProgress);
-        assertTrue(actual.getIncremental().isEmpty());
+        assertNull(actual.getIncremental());
     }
     
     @Test
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
index de613760532..141abac58e0 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
@@ -20,6 +20,8 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 import 
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobIncrementalTaskProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobInventoryTaskProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
@@ -29,8 +31,6 @@ import org.junit.Test;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -68,9 +68,7 @@ public final class 
IdleRuleAlteredJobCompletionDetectAlgorithmTest {
     public void assertFalseOnUnFinishedPosition() {
         int jobShardingCount = 1;
         JobProgress jobProgress = new JobProgress();
-        Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new 
LinkedHashMap<>();
-        jobProgress.setInventoryTaskProgressMap(inventoryTaskProgressMap);
-        inventoryTaskProgressMap.put("foo_ds", new InventoryTaskProgress(new 
PlaceholderPosition()));
+        jobProgress.setJobInventoryTask(new 
JobInventoryTaskProgress(Collections.singletonMap("foo_ds", new 
InventoryTaskProgress(new PlaceholderPosition()))));
         Collection<JobProgress> jobProgresses = 
Collections.singleton(jobProgress);
         RuleAlteredJobAlmostCompletedParameter parameter = new 
RuleAlteredJobAlmostCompletedParameter(jobShardingCount, jobProgresses);
         assertFalse(JobCompletionDetectAlgorithmFactory.newInstance(new 
AlgorithmConfiguration("IDLE", new Properties())).isAlmostCompleted(parameter));
@@ -100,10 +98,11 @@ public final class 
IdleRuleAlteredJobCompletionDetectAlgorithmTest {
     
     private JobProgress createJobProgress(final long latestActiveTimeMillis) {
         JobProgress result = new JobProgress();
-        result.setInventoryTaskProgressMap(Collections.singletonMap("foo_ds", 
new InventoryTaskProgress(new FinishedPosition())));
+        result.setJobInventoryTask(new 
JobInventoryTaskProgress(Collections.singletonMap("foo_ds", new 
InventoryTaskProgress(new FinishedPosition()))));
         IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress();
         
incrementalTaskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(latestActiveTimeMillis);
-        
result.setIncrementalTaskProgressMap(Collections.singletonMap("foo_ds", 
incrementalTaskProgress));
+        JobIncrementalTaskProgress jobIncrementalTaskProgress = new 
JobIncrementalTaskProgress(Collections.singletonMap("foo_ds", 
incrementalTaskProgress));
+        result.setJobIncrementalTask(jobIncrementalTaskProgress);
         return result;
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index cf8fe6c47c0..ee0ea66f257 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -83,8 +83,9 @@ public final class InventoryTaskSplitterTest {
         initEmptyTablePrimaryEnvironment(taskConfig.getDumperConfig());
         List<InventoryTask> actual = 
inventoryTaskSplitter.splitInventoryData(jobContext);
         assertThat(actual.size(), is(1));
-        assertThat(((IntegerPrimaryKeyPosition) 
actual.get(0).getProgress().getPosition()).getBeginValue(), is(0L));
-        assertThat(((IntegerPrimaryKeyPosition) 
actual.get(0).getProgress().getPosition()).getEndValue(), is(0L));
+        InventoryTask task = actual.get(0);
+        assertThat(((IntegerPrimaryKeyPosition) 
task.getProgress().getPosition()).getBeginValue(), is(0L));
+        assertThat(((IntegerPrimaryKeyPosition) 
task.getProgress().getPosition()).getEndValue(), is(0L));
     }
     
     @Test
@@ -92,8 +93,9 @@ public final class InventoryTaskSplitterTest {
         initIntPrimaryEnvironment(taskConfig.getDumperConfig());
         List<InventoryTask> actual = 
inventoryTaskSplitter.splitInventoryData(jobContext);
         assertThat(actual.size(), is(10));
-        assertThat(((IntegerPrimaryKeyPosition) 
actual.get(9).getProgress().getPosition()).getBeginValue(), is(91L));
-        assertThat(((IntegerPrimaryKeyPosition) 
actual.get(9).getProgress().getPosition()).getEndValue(), is(100L));
+        InventoryTask task = actual.get(9);
+        assertThat(((IntegerPrimaryKeyPosition) 
task.getProgress().getPosition()).getBeginValue(), is(91L));
+        assertThat(((IntegerPrimaryKeyPosition) 
task.getProgress().getPosition()).getEndValue(), is(100L));
     }
     
     @Test
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/governance-repository.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/governance-repository.yaml
index ea94331aaf3..06de622c13a 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/governance-repository.yaml
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/governance-repository.yaml
@@ -16,11 +16,11 @@
 #
 
 incremental:
-  ds_0:
-    delay:
-      lastEventTimestamps: 0
-      latestActiveTimeMillis: 0
-    position: ''
+  dataSourceName: ds_0
+  delay:
+    lastEventTimestamps: 0
+    latestActiveTimeMillis: 0
+  position: ''
 inventory:
   unfinished:
     ds_0.t_order#0: ''
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
index 04ba007f592..0ae217af602 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
@@ -16,11 +16,11 @@
 #
 
 incremental:
-  ds0:
-    delay:
-      lastEventTimestamps: 0
-      latestActiveTimeMillis: 50
-    position: ''
+  dataSourceName: ds0
+  delay:
+    lastEventTimestamps: 0
+    latestActiveTimeMillis: 50
+  position: ''
 inventory:
   finished:
   - ds0.t_2
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml
index 4b2efffdf7c..b0994b68b59 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml
@@ -15,12 +15,12 @@
 # limitations under the License.
 #
 #
-# incremental:
-#   ds0:
-#     delay:
-#       lastEventTimestamps: 0
-#       latestActiveTimeMillis: 0
-#     position: ''
+#incremental:
+#  dataSourceName: ds0
+# delay:
+#    lastEventTimestamps: 0
+#    latestActiveTimeMillis: 0
+#  position: ''
 inventory:
   unfinished:
     ds0.t_1: i,1,2
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-inventory.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-inventory.yaml
index 58a4d08e708..897258a53ce 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-inventory.yaml
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-inventory.yaml
@@ -15,11 +15,11 @@
 # limitations under the License.
 #
 #
- incremental:
-   ds0:
-     delay:
-       lastEventTimestamps: 0
-       latestActiveTimeMillis: 0
-     position: ''
- sourceDatabaseType: H2
- status: RUNNING
+incremental:
+  dataSourceName: ds0
+  delay:
+    lastEventTimestamps: 0
+    latestActiveTimeMillis: 0
+  position: ''
+sourceDatabaseType: H2
+status: RUNNING
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml
index 2333522962f..61fe5448960 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml
@@ -16,11 +16,11 @@
 #
 
 incremental:
-  ds0:
-    delay:
-      lastEventTimestamps: 0
-      latestActiveTimeMillis: 0
-    position: ''
+  dataSourceName: ds0
+  delay:
+    lastEventTimestamps: 0
+    latestActiveTimeMillis: 0
+  position: ''
 inventory:
   finished:
   - ds0.t_2

Reply via email to