This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 597c6448fbb Add pipeline job common interfaces (#19758)
597c6448fbb is described below
commit 597c6448fbb9fdebcb7298d09289794f13244bd7
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Aug 2 15:00:12 2022 +0800
Add pipeline job common interfaces (#19758)
* Add pipeline job common interfaces
* Improve scaling IT waitScalingFinished
---
.../data/pipeline/api/RuleAlteredJobAPI.java | 1 +
.../api/config/job/PipelineJobConfiguration.java | 7 +++++++
.../RuleAlteredJobAlmostCompletedParameter.java | 1 +
.../PipelineJob.java} | 20 +++-----------------
.../data/pipeline/api/job/progress/JobProgress.java | 3 ++-
.../progress/PipelineJobProgress.java} | 20 +++-----------------
.../scenario/rulealtered/RuleAlteredJob.java | 3 ++-
.../data/pipeline/cases/base/BaseITCase.java | 10 +++++-----
8 files changed, 24 insertions(+), 41 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
index db201f9a5b0..283de7dc691 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
@@ -56,6 +56,7 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
* @param jobId job id
* @return each sharding item progress
*/
+ // TODO now update JobProgress
Map<Integer, JobProgress> getProgress(String jobId);
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
index 58bf6cb2128..f16a785e138 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
@@ -35,4 +35,11 @@ public interface PipelineJobConfiguration {
* @return database name
*/
String getDatabaseName();
+
+ /**
+ * Get job sharding count.
+ *
+ * @return job sharding count
+ */
+ int getJobShardingCount();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/RuleAlteredJobAlmostCompletedParameter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/RuleAlteredJobAlmostCompletedParameter.java
index 58d32a52eb3..7d349cb01b5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/RuleAlteredJobAlmostCompletedParameter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/RuleAlteredJobAlmostCompletedParameter.java
@@ -31,6 +31,7 @@ import java.util.Collection;
@RequiredArgsConstructor
@Getter
@ToString
+// TODO now rename
public final class RuleAlteredJobAlmostCompletedParameter {
private final int jobShardingCount;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
similarity index 70%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
index 58bf6cb2128..183c32856ea 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
@@ -15,24 +15,10 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config.job;
+package org.apache.shardingsphere.data.pipeline.api.job;
/**
- * Pipeline job configuration.
+ * Pipeline job.
*/
-public interface PipelineJobConfiguration {
-
- /**
- * Get job id.
- *
- * @return job id
- */
- String getJobId();
-
- /**
- * Get database name.
- *
- * @return database name
- */
- String getDatabaseName();
+public interface PipelineJob {
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
index df70f90c1d7..2f0be9743b1 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
@@ -37,7 +37,8 @@ import java.util.stream.Collectors;
*/
@Getter
@Setter
-public final class JobProgress {
+// TODO now rename
+public final class JobProgress implements PipelineJobProgress {
private JobStatus status = JobStatus.RUNNING;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/PipelineJobProgress.java
similarity index 70%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/PipelineJobProgress.java
index 58bf6cb2128..ab7a8e080bb 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/PipelineJobProgress.java
@@ -15,24 +15,10 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config.job;
+package org.apache.shardingsphere.data.pipeline.api.job.progress;
/**
- * Pipeline job configuration.
+ * Pipeline job progress.
*/
-public interface PipelineJobConfiguration {
-
- /**
- * Get job id.
- *
- * @return job id
- */
- String getJobId();
-
- /**
- * Get database name.
- *
- * @return database name
- */
- String getDatabaseName();
+public interface PipelineJobProgress {
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index 4ff6a24d464..5c579515b2b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -31,7 +32,7 @@ import
org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
* Rule altered job.
*/
@Slf4j
-public final class RuleAlteredJob implements SimpleJob {
+public final class RuleAlteredJob implements SimpleJob, PipelineJob {
private final GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 10628d82044..f1bef198a71 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -332,6 +332,7 @@ public abstract class BaseITCase {
}
protected void applyScaling(final String jobId) {
+ assertBeforeApplyScalingMetadataCorrectly();
executeWithLog(String.format("APPLY SCALING %s", jobId));
}
@@ -356,11 +357,11 @@ public abstract class BaseITCase {
Set<String> actualStatus = null;
for (int i = 0; i < 15; i++) {
actualStatus = new HashSet<>();
- List<Map<String, Object>> showScalingStatusResMap =
showScalingStatus(jobId);
- log.info("show scaling status result: {}",
showScalingStatusResMap);
+ List<Map<String, Object>> showScalingStatusResult =
showScalingStatus(jobId);
+ log.info("show scaling status result: {}",
showScalingStatusResult);
boolean finished = true;
- for (Map<String, Object> entry : showScalingStatusResMap) {
- String status = entry.get("status").toString();
+ for (Map<String, Object> each : showScalingStatusResult) {
+ String status = each.get("status").toString();
assertThat(status, not(JobStatus.PREPARING_FAILURE.name()));
assertThat(status,
not(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name()));
assertThat(status,
not(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name()));
@@ -373,7 +374,6 @@ public abstract class BaseITCase {
if (finished) {
break;
}
- assertBeforeApplyScalingMetadataCorrectly();
ThreadUtil.sleep(4, TimeUnit.SECONDS);
}
assertThat(actualStatus,
is(Collections.singleton(JobStatus.EXECUTE_INCREMENTAL_TASK.name())));