This is an automated email from the ASF dual-hosted git repository. panjuan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 077058aac7a Extract JobProgress methods into inventory and incremental task progress (#19940) 077058aac7a is described below commit 077058aac7a7536bdc6f9e9f4815c81405942634 Author: Hongsheng Zhong <sand...@126.com> AuthorDate: Mon Aug 8 09:49:23 2022 +0800 Extract JobProgress methods into inventory and incremental task progress (#19940) --- .../query/ShowScalingJobStatusQueryResultSet.java | 6 +-- .../progress/JobItemIncrementalTasksProgress.java | 9 ++++ .../pipeline/api/job/progress/JobProgress.java | 50 ---------------------- .../rulealtered/RuleAlteredJobPreparer.java | 5 ++- .../rulealtered/prepare/InventoryTaskSplitter.java | 2 +- .../core/job/progress/JobProgressTest.java | 32 +++++--------- .../progress/yaml/YamlJobProgressSwapperTest.java | 11 +---- .../test/resources/job-progress-no-finished.yaml | 31 -------------- 8 files changed, 28 insertions(+), 118 deletions(-) diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java index 37bfae61842..62ec3670803 100644 --- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java +++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java @@ -48,11 +48,11 @@ public final class ShowScalingJobStatusQueryResultSet implements DistSQLResultSe Collection<Object> result = new LinkedList<>(); result.add(entry.getKey()); if (null != entry.getValue()) { - result.add(entry.getValue().getDataSource()); + result.add(entry.getValue().getIncremental().getDataSourceName()); result.add(entry.getValue().getStatus()); result.add(entry.getValue().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString()); - result.add(entry.getValue().getInventoryFinishedPercentage()); - long latestActiveTimeMillis = entry.getValue().getIncrementalLatestActiveTimeMillis(); + result.add(entry.getValue().getInventory().getInventoryFinishedPercentage()); + long latestActiveTimeMillis = entry.getValue().getIncremental().getIncrementalLatestActiveTimeMillis(); result.add(latestActiveTimeMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 0); } else { result.add(""); diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java index ee4b4efeb2d..b4065d087f6 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java @@ -47,6 +47,15 @@ public final class JobItemIncrementalTasksProgress { return incrementalTaskProgress.map(IncrementalTaskProgress::getPosition); } + /** + * Get data source name. + * + * @return data source + */ + public String getDataSourceName() { + return incrementalTaskProgressMap.keySet().stream().findAny().orElse(""); + } + /** * Get incremental latest active time milliseconds. * diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java index c210dda4a4e..9a73161dfd9 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java @@ -19,12 +19,8 @@ package org.apache.shardingsphere.data.pipeline.api.job.progress; import lombok.Getter; import lombok.Setter; -import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; -import java.util.Map; -import java.util.Optional; - /** * Job progress. */ @@ -42,50 +38,4 @@ public final class JobProgress implements PipelineJobItemProgress { private JobItemInventoryTasksProgress inventory; private JobItemIncrementalTasksProgress incremental; - - /** - * get incremental position. - * @param dataSourceName dataSource - * @return incremental position - */ - public Optional<IngestPosition<?>> getIncrementalPosition(final String dataSourceName) { - return incremental.getIncrementalPosition(dataSourceName); - } - - /** - * Get inventory position. - * - * @param tableName table name - * @return inventory position - */ - public Map<String, IngestPosition<?>> getInventoryPosition(final String tableName) { - return inventory.getInventoryPosition(tableName); - } - - /** - * Get data source. - * - * @return data source - */ - public String getDataSource() { - return incremental.getIncrementalTaskProgressMap().keySet().stream().findAny().orElse(""); - } - - /** - * Get inventory finished percentage. - * - * @return finished percentage - */ - public int getInventoryFinishedPercentage() { - return inventory.getInventoryFinishedPercentage(); - } - - /** - * Get incremental latest active time milliseconds. - * - * @return latest active time, <code>0</code> is there is no activity - */ - public long getIncrementalLatestActiveTimeMillis() { - return null == incremental ? 0L : incremental.getIncrementalLatestActiveTimeMillis(); - } } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java index 979a33268af..f177417fd32 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java @@ -200,8 +200,9 @@ public final class RuleAlteredJobPreparer { private IngestPosition<?> getIncrementalPosition(final RuleAlteredJobContext jobContext, final TaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager) throws SQLException { - if (null != jobContext.getInitProgress()) { - Optional<IngestPosition<?>> position = jobContext.getInitProgress().getIncrementalPosition(taskConfig.getDumperConfig().getDataSourceName()); + JobProgress initProgress = jobContext.getInitProgress(); + if (null != initProgress) { + Optional<IngestPosition<?>> position = initProgress.getIncremental().getIncrementalPosition(taskConfig.getDumperConfig().getDataSourceName()); if (position.isPresent()) { return position.get(); } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java index 49d4ffbc738..2d80bee5541 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java @@ -139,7 +139,7 @@ public final class InventoryTaskSplitter { PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaName, actualTableName); PipelineColumnMetaData uniqueKeyColumn = mustGetAnAppropriateUniqueKeyColumn(tableMetaData, actualTableName); if (null != initProgress && initProgress.getStatus() != JobStatus.PREPARING_FAILURE) { - Collection<IngestPosition<?>> result = initProgress.getInventoryPosition(dumperConfig.getActualTableName()).values(); + Collection<IngestPosition<?>> result = initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values(); for (IngestPosition<?> each : result) { if (each instanceof PrimaryKeyPosition) { dumperConfig.setUniqueKey(uniqueKeyColumn.getName()); diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java index 1fc6afb8f4a..f93aaba6e44 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java @@ -19,12 +19,12 @@ package org.apache.shardingsphere.data.pipeline.core.job.progress; import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; -import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition; +import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition; import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress; -import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper; import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgress; +import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper; import org.apache.shardingsphere.data.pipeline.core.util.ConfigurationFileUtil; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import org.junit.Test; @@ -52,7 +52,7 @@ public final class JobProgressTest { @Test public void assertGetIncrementalPosition() { JobProgress jobProgress = getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml")); - Optional<IngestPosition<?>> position = jobProgress.getIncrementalPosition("ds0"); + Optional<IngestPosition<?>> position = jobProgress.getIncremental().getIncrementalPosition("ds0"); assertTrue(position.isPresent()); assertThat(position.get(), instanceOf(PlaceholderPosition.class)); } @@ -60,41 +60,31 @@ public final class JobProgressTest { @Test public void assertGetInventoryPosition() { JobProgress jobProgress = getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml")); - assertThat(jobProgress.getInventoryPosition("ds0").size(), is(2)); - assertThat(jobProgress.getInventoryPosition("ds0").get("ds0.t_1"), instanceOf(FinishedPosition.class)); - assertThat(jobProgress.getInventoryPosition("ds1").get("ds1.t_1"), instanceOf(PlaceholderPosition.class)); - assertThat(jobProgress.getInventoryPosition("ds1").get("ds1.t_2"), instanceOf(IntegerPrimaryKeyPosition.class)); + assertThat(jobProgress.getInventory().getInventoryPosition("ds0").size(), is(2)); + assertThat(jobProgress.getInventory().getInventoryPosition("ds0").get("ds0.t_1"), instanceOf(FinishedPosition.class)); + assertThat(jobProgress.getInventory().getInventoryPosition("ds1").get("ds1.t_1"), instanceOf(PlaceholderPosition.class)); + assertThat(jobProgress.getInventory().getInventoryPosition("ds1").get("ds1.t_2"), instanceOf(IntegerPrimaryKeyPosition.class)); } @Test public void assertGetInventoryFinishedPercentage() { JobProgress jobProgress = getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml")); - assertThat(jobProgress.getInventoryFinishedPercentage(), is(50)); - } - - @Test - public void assertGetNoFinishedInventoryFinishedPercentage() { - assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-finished.yaml")).getInventoryFinishedPercentage(), is(0)); + assertThat(jobProgress.getInventory().getInventoryFinishedPercentage(), is(50)); } @Test public void assertGetAllFinishedInventoryFinishedPercentage() { - assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-all-finished.yaml")).getInventoryFinishedPercentage(), is(100)); + assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-all-finished.yaml")).getInventory().getInventoryFinishedPercentage(), is(100)); } @Test public void assertGetIncrementalLatestActiveTimeMillis() { - assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml")).getIncrementalLatestActiveTimeMillis(), is(0L)); + assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml")).getIncremental().getIncrementalLatestActiveTimeMillis(), is(0L)); } @Test public void assertGetIncrementalDataLatestActiveTimeMillis() { - assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-all-finished.yaml")).getIncrementalLatestActiveTimeMillis(), is(50L)); - } - - @Test - public void assertGetNoIncrementalDataLatestActiveTimeMillis() { - assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-finished.yaml")).getIncrementalLatestActiveTimeMillis(), is(0L)); + assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-all-finished.yaml")).getIncremental().getIncrementalLatestActiveTimeMillis(), is(50L)); } private JobProgress getJobProgress(final String data) { diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java index e650552b2f6..44c46f2e730 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java @@ -20,13 +20,11 @@ package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress; import org.apache.shardingsphere.data.pipeline.core.util.ConfigurationFileUtil; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; - import org.junit.Test; import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; public final class YamlJobProgressSwapperTest { @@ -51,13 +49,6 @@ public final class YamlJobProgressSwapperTest { assertThat(actual.getIncremental().getPosition().length(), is(0)); } - @Test - public void assertNullIncremental() { - JobProgress jobProgress = getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-finished.yaml")); - YamlJobProgress actual = SWAPPER.swapToYaml(jobProgress); - assertNull(actual.getIncremental()); - } - @Test public void assertNullInventory() { JobProgress jobProgress = getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-inventory.yaml")); diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml deleted file mode 100644 index b0994b68b59..00000000000 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml +++ /dev/null @@ -1,31 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -#incremental: -# dataSourceName: ds0 -# delay: -# lastEventTimestamps: 0 -# latestActiveTimeMillis: 0 -# position: '' -inventory: - unfinished: - ds0.t_1: i,1,2 - ds0.t_2: '' - ds1.t_2: i,1,2 - ds1.t_1: '' -sourceDatabaseType: H2 -status: RUNNING