This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d15ca41bef3 Refactor PipelineJobHasAlreadyFinishedException (#21275)
d15ca41bef3 is described below
commit d15ca41bef3909a7b8838d9c7c97497dacde6235
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Sep 30 01:05:07 2022 +0800
Refactor PipelineJobHasAlreadyFinishedException (#21275)
* Refactor PipelineJobHasAlreadyFinishedException
* Refactor PipelineJobHasAlreadyFinishedException
---
.../user-manual/error-code/sql-error-code.cn.md | 1 +
.../user-manual/error-code/sql-error-code.en.md | 1 +
.../PipelineJobHasAlreadyFinishedException.java | 6 ++---
.../ConsistencyCheckJobAPIImpl.java | 28 +++++++++-------------
.../general/PostgreSQLMigrationGeneralIT.java | 18 +++++++-------
5 files changed, 24 insertions(+), 30 deletions(-)
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index 4813cb33516..c164cde2108 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -115,6 +115,7 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
| 08000 | 18092 | Get binlog position failed by job \`%s\`, reason
is: %s |
| HY000 | 18093 | Can not poll event because of binlog sync channel
already closed |
| HY000 | 18094 | Task \`%s\` execute failed |
+| HY000 | 18095 | Job has already finished, please run \`CHECK
MIGRATION %s\` to start a new data consistency check job |
### DistSQL
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md
b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 170cf4b4b3e..4dea15e1dc9 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -115,6 +115,7 @@ SQL error codes provide by standard `SQL State`, `Vendor
Code` and `Reason`, whi
| 08000 | 18092 | Get binlog position failed by job \`%s\`, reason
is: %s |
| HY000 | 18093 | Can not poll event because of binlog sync channel
already closed |
| HY000 | 18094 | Task \`%s\` execute failed |
+| HY000 | 18095 | Job has already finished, please run \`CHECK
MIGRATION %s\` to start a new data consistency check job |
### DistSQL
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
index c0835346d6d..8bb7e5bdcd4 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
@@ -25,9 +25,9 @@ import
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpe
*/
public final class PipelineJobHasAlreadyFinishedException extends
PipelineSQLException {
- private static final long serialVersionUID = 2854259384634892428L;
+ private static final long serialVersionUID = 6881217592831423520L;
- public PipelineJobHasAlreadyFinishedException(final String message) {
- super(XOpenSQLState.GENERAL_ERROR, 88, message);
+ public PipelineJobHasAlreadyFinishedException(final String jobId) {
+ super(XOpenSQLState.GENERAL_ERROR, 95, "Job has already finished,
please run `CHECK MIGRATION %s` to start a new data consistency check job",
jobId);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index c4963d50ab6..4ad73f6ee91 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -45,6 +45,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNot
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.util.Collections;
@@ -131,32 +132,25 @@ public final class ConsistencyCheckJobAPIImpl extends
AbstractPipelineJobAPIImpl
@Override
public void startDisabledJob(final String jobId) {
- log.info("start disable check job {}", jobId);
+ log.info("Start disable check job {}", jobId);
PipelineJobItemProgress jobProgress = getJobItemProgress(jobId, 0);
- if (null != jobProgress && JobStatus.FINISHED ==
jobProgress.getStatus()) {
- throw new
PipelineJobHasAlreadyFinishedException(String.format("job already finished, can
use `CHECK MIGRATION '%s'` to start a new data consistency check job", jobId));
- }
- super.startDisabledJob(jobId);
+ ShardingSpherePreconditions.checkState(null == jobProgress ||
JobStatus.FINISHED != jobProgress.getStatus(), () -> new
PipelineJobHasAlreadyFinishedException(jobId));
}
@Override
public void startByParentJobId(final String parentJobId) {
- log.info("start check job by parentJobId {}", parentJobId);
- Optional<String> optional =
PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
- if (!optional.isPresent()) {
- throw new PipelineJobNotFoundException(parentJobId + " check job");
- }
- startDisabledJob(optional.get());
+ log.info("Start check job by parent job id: {}", parentJobId);
+ Optional<String> checkLatestJobId =
PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+ ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(),
() -> new PipelineJobNotFoundException(parentJobId));
+ startDisabledJob(checkLatestJobId.get());
}
@Override
public void stopByParentJobId(final String parentJobId) {
- log.info("stop check job by parentJobId {}", parentJobId);
- Optional<String> optional =
PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
- if (!optional.isPresent()) {
- throw new PipelineJobNotFoundException(parentJobId + " check job");
- }
- stop(optional.get());
+ log.info("Stop check job by parent job id: {}", parentJobId);
+ Optional<String> checkLatestJobId =
PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+ ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(),
() -> new PipelineJobNotFoundException(parentJobId));
+ stop(checkLatestJobId.get());
}
@Override
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index d4c892d8db3..1da9b9c08f7 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -44,10 +44,10 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
/**
- * PostgreSQL general scaling test case. include openGauss type, same process.
+ * PostgreSQL and openGauss general scaling test case.
*/
-@Slf4j
@RunWith(Parameterized.class)
+@Slf4j
public final class PostgreSQLMigrationGeneralIT extends
AbstractMigrationITCase {
private static final KeyGenerateAlgorithm KEY_GENERATE_ALGORITHM = new
AutoIncrementKeyGenerateAlgorithm();
@@ -117,15 +117,13 @@ public final class PostgreSQLMigrationGeneralIT extends
AbstractMigrationITCase
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
/*
* TODO Compatible with restart job, before stopping job,
incremental_idle_seconds=16, before checking migration,
incremental_idle_seconds=23,
- * it just pass 7 seconds, and it's not enough for PostgreSQL
incremental task to sync data
+ * it just pass 7 seconds, and it's not enough for PostgreSQL
incremental task to sync data
*/
-/*
- stopMigrationByJobId(jobId);
- sourceExecuteWithLog(String.format("INSERT INTO %s.%s
(order_id,user_id,status) VALUES (%s, %s, '%s')", SCHEMA_NAME,
getSourceTableOrderName(), KEY_GENERATE_ALGORITHM.generateKey(),
- 1, "afterStop"));
- startMigrationByJobId(jobId);
- waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
-*/
+// stopMigrationByJobId(jobId);
+// sourceExecuteWithLog(String.format("INSERT INTO %s.%s
(order_id,user_id,status) VALUES (%s, %s, '%s')", SCHEMA_NAME,
getSourceTableOrderName(), KEY_GENERATE_ALGORITHM.generateKey(),
+// 1, "afterStop"));
+// startMigrationByJobId(jobId);
+// waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS
'%s'", jobId));
assertCheckMigrationSuccess(jobId, "DATA_MATCH");
stopMigrationByJobId(jobId);
}