This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 077058aac7a Extract JobProgress methods into inventory and incremental 
task progress (#19940)
077058aac7a is described below

commit 077058aac7a7536bdc6f9e9f4815c81405942634
Author: Hongsheng Zhong <sand...@126.com>
AuthorDate: Mon Aug 8 09:49:23 2022 +0800

    Extract JobProgress methods into inventory and incremental task progress 
(#19940)
---
 .../query/ShowScalingJobStatusQueryResultSet.java  |  6 +--
 .../progress/JobItemIncrementalTasksProgress.java  |  9 ++++
 .../pipeline/api/job/progress/JobProgress.java     | 50 ----------------------
 .../rulealtered/RuleAlteredJobPreparer.java        |  5 ++-
 .../rulealtered/prepare/InventoryTaskSplitter.java |  2 +-
 .../core/job/progress/JobProgressTest.java         | 32 +++++---------
 .../progress/yaml/YamlJobProgressSwapperTest.java  | 11 +----
 .../test/resources/job-progress-no-finished.yaml   | 31 --------------
 8 files changed, 28 insertions(+), 118 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java
index 37bfae61842..62ec3670803 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java
@@ -48,11 +48,11 @@ public final class ShowScalingJobStatusQueryResultSet 
implements DistSQLResultSe
                     Collection<Object> result = new LinkedList<>();
                     result.add(entry.getKey());
                     if (null != entry.getValue()) {
-                        result.add(entry.getValue().getDataSource());
+                        
result.add(entry.getValue().getIncremental().getDataSourceName());
                         result.add(entry.getValue().getStatus());
                         result.add(entry.getValue().isActive() ? 
Boolean.TRUE.toString() : Boolean.FALSE.toString());
-                        
result.add(entry.getValue().getInventoryFinishedPercentage());
-                        long latestActiveTimeMillis = 
entry.getValue().getIncrementalLatestActiveTimeMillis();
+                        
result.add(entry.getValue().getInventory().getInventoryFinishedPercentage());
+                        long latestActiveTimeMillis = 
entry.getValue().getIncremental().getIncrementalLatestActiveTimeMillis();
                         result.add(latestActiveTimeMillis > 0 ? 
TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 
0);
                     } else {
                         result.add("");
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
index ee4b4efeb2d..b4065d087f6 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
@@ -47,6 +47,15 @@ public final class JobItemIncrementalTasksProgress {
         return 
incrementalTaskProgress.map(IncrementalTaskProgress::getPosition);
     }
     
+    /**
+     * Get data source name.
+     *
+     * @return data source
+     */
+    public String getDataSourceName() {
+        return 
incrementalTaskProgressMap.keySet().stream().findAny().orElse("");
+    }
+    
     /**
      * Get incremental latest active time milliseconds.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
index c210dda4a4e..9a73161dfd9 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
@@ -19,12 +19,8 @@ package 
org.apache.shardingsphere.data.pipeline.api.job.progress;
 
 import lombok.Getter;
 import lombok.Setter;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 
-import java.util.Map;
-import java.util.Optional;
-
 /**
  * Job progress.
  */
@@ -42,50 +38,4 @@ public final class JobProgress implements 
PipelineJobItemProgress {
     private JobItemInventoryTasksProgress inventory;
     
     private JobItemIncrementalTasksProgress incremental;
-    
-    /**
-     * get incremental position.
-     * @param dataSourceName dataSource
-     * @return incremental position
-     */
-    public Optional<IngestPosition<?>> getIncrementalPosition(final String 
dataSourceName) {
-        return incremental.getIncrementalPosition(dataSourceName);
-    }
-    
-    /**
-     * Get inventory position.
-     *
-     * @param tableName table name
-     * @return inventory position
-     */
-    public Map<String, IngestPosition<?>> getInventoryPosition(final String 
tableName) {
-        return inventory.getInventoryPosition(tableName);
-    }
-    
-    /**
-     * Get data source.
-     *
-     * @return data source
-     */
-    public String getDataSource() {
-        return 
incremental.getIncrementalTaskProgressMap().keySet().stream().findAny().orElse("");
-    }
-    
-    /**
-     * Get inventory finished percentage.
-     *
-     * @return finished percentage
-     */
-    public int getInventoryFinishedPercentage() {
-        return inventory.getInventoryFinishedPercentage();
-    }
-    
-    /**
-     * Get incremental latest active time milliseconds.
-     *
-     * @return latest active time, <code>0</code> is there is no activity
-     */
-    public long getIncrementalLatestActiveTimeMillis() {
-        return null == incremental ? 0L : 
incremental.getIncrementalLatestActiveTimeMillis();
-    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 979a33268af..f177417fd32 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -200,8 +200,9 @@ public final class RuleAlteredJobPreparer {
     
     private IngestPosition<?> getIncrementalPosition(final 
RuleAlteredJobContext jobContext, final TaskConfiguration taskConfig,
                                                      final 
PipelineDataSourceManager dataSourceManager) throws SQLException {
-        if (null != jobContext.getInitProgress()) {
-            Optional<IngestPosition<?>> position = 
jobContext.getInitProgress().getIncrementalPosition(taskConfig.getDumperConfig().getDataSourceName());
+        JobProgress initProgress = jobContext.getInitProgress();
+        if (null != initProgress) {
+            Optional<IngestPosition<?>> position = 
initProgress.getIncremental().getIncrementalPosition(taskConfig.getDumperConfig().getDataSourceName());
             if (position.isPresent()) {
                 return position.get();
             }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 49d4ffbc738..2d80bee5541 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -139,7 +139,7 @@ public final class InventoryTaskSplitter {
         PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(schemaName, actualTableName);
         PipelineColumnMetaData uniqueKeyColumn = 
mustGetAnAppropriateUniqueKeyColumn(tableMetaData, actualTableName);
         if (null != initProgress && initProgress.getStatus() != 
JobStatus.PREPARING_FAILURE) {
-            Collection<IngestPosition<?>> result = 
initProgress.getInventoryPosition(dumperConfig.getActualTableName()).values();
+            Collection<IngestPosition<?>> result = 
initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values();
             for (IngestPosition<?> each : result) {
                 if (each instanceof PrimaryKeyPosition) {
                     dumperConfig.setUniqueKey(uniqueKeyColumn.getName());
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
index 1fc6afb8f4a..f93aaba6e44 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
@@ -19,12 +19,12 @@ package 
org.apache.shardingsphere.data.pipeline.core.job.progress;
 
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.core.util.ConfigurationFileUtil;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.junit.Test;
@@ -52,7 +52,7 @@ public final class JobProgressTest {
     @Test
     public void assertGetIncrementalPosition() {
         JobProgress jobProgress = 
getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml"));
-        Optional<IngestPosition<?>> position = 
jobProgress.getIncrementalPosition("ds0");
+        Optional<IngestPosition<?>> position = 
jobProgress.getIncremental().getIncrementalPosition("ds0");
         assertTrue(position.isPresent());
         assertThat(position.get(), instanceOf(PlaceholderPosition.class));
     }
@@ -60,41 +60,31 @@ public final class JobProgressTest {
     @Test
     public void assertGetInventoryPosition() {
         JobProgress jobProgress = 
getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml"));
-        assertThat(jobProgress.getInventoryPosition("ds0").size(), is(2));
-        assertThat(jobProgress.getInventoryPosition("ds0").get("ds0.t_1"), 
instanceOf(FinishedPosition.class));
-        assertThat(jobProgress.getInventoryPosition("ds1").get("ds1.t_1"), 
instanceOf(PlaceholderPosition.class));
-        assertThat(jobProgress.getInventoryPosition("ds1").get("ds1.t_2"), 
instanceOf(IntegerPrimaryKeyPosition.class));
+        
assertThat(jobProgress.getInventory().getInventoryPosition("ds0").size(), 
is(2));
+        
assertThat(jobProgress.getInventory().getInventoryPosition("ds0").get("ds0.t_1"),
 instanceOf(FinishedPosition.class));
+        
assertThat(jobProgress.getInventory().getInventoryPosition("ds1").get("ds1.t_1"),
 instanceOf(PlaceholderPosition.class));
+        
assertThat(jobProgress.getInventory().getInventoryPosition("ds1").get("ds1.t_2"),
 instanceOf(IntegerPrimaryKeyPosition.class));
     }
     
     @Test
     public void assertGetInventoryFinishedPercentage() {
         JobProgress jobProgress = 
getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml"));
-        assertThat(jobProgress.getInventoryFinishedPercentage(), is(50));
-    }
-    
-    @Test
-    public void assertGetNoFinishedInventoryFinishedPercentage() {
-        
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-finished.yaml")).getInventoryFinishedPercentage(),
 is(0));
+        
assertThat(jobProgress.getInventory().getInventoryFinishedPercentage(), is(50));
     }
     
     @Test
     public void assertGetAllFinishedInventoryFinishedPercentage() {
-        
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-all-finished.yaml")).getInventoryFinishedPercentage(),
 is(100));
+        
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-all-finished.yaml")).getInventory().getInventoryFinishedPercentage(),
 is(100));
     }
     
     @Test
     public void assertGetIncrementalLatestActiveTimeMillis() {
-        
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml")).getIncrementalLatestActiveTimeMillis(),
 is(0L));
+        
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml")).getIncremental().getIncrementalLatestActiveTimeMillis(),
 is(0L));
     }
     
     @Test
     public void assertGetIncrementalDataLatestActiveTimeMillis() {
-        
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-all-finished.yaml")).getIncrementalLatestActiveTimeMillis(),
 is(50L));
-    }
-    
-    @Test
-    public void assertGetNoIncrementalDataLatestActiveTimeMillis() {
-        
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-finished.yaml")).getIncrementalLatestActiveTimeMillis(),
 is(0L));
+        
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-all-finished.yaml")).getIncremental().getIncrementalLatestActiveTimeMillis(),
 is(50L));
     }
     
     private JobProgress getJobProgress(final String data) {
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
index e650552b2f6..44c46f2e730 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
@@ -20,13 +20,11 @@ package 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.core.util.ConfigurationFileUtil;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 
 public final class YamlJobProgressSwapperTest {
     
@@ -51,13 +49,6 @@ public final class YamlJobProgressSwapperTest {
         assertThat(actual.getIncremental().getPosition().length(), is(0));
     }
     
-    @Test
-    public void assertNullIncremental() {
-        JobProgress jobProgress = 
getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-finished.yaml"));
-        YamlJobProgress actual = SWAPPER.swapToYaml(jobProgress);
-        assertNull(actual.getIncremental());
-    }
-    
     @Test
     public void assertNullInventory() {
         JobProgress jobProgress = 
getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-inventory.yaml"));
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml
deleted file mode 100644
index b0994b68b59..00000000000
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#incremental:
-#  dataSourceName: ds0
-# delay:
-#    lastEventTimestamps: 0
-#    latestActiveTimeMillis: 0
-#  position: ''
-inventory:
-  unfinished:
-    ds0.t_1: i,1,2
-    ds0.t_2: ''
-    ds1.t_2: i,1,2
-    ds1.t_1: ''
-sourceDatabaseType: H2
-status: RUNNING

Reply via email to