This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 71a42ef85a9 Refactor PipelineJobOption (#37113)
71a42ef85a9 is described below
commit 71a42ef85a9a0a69e5da56b6da9d15764a18bf47
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 15 17:41:23 2025 +0800
Refactor PipelineJobOption (#37113)
* Add ConsistencyCheckJobTypeTest
* Refactor PipelineJobOption
---
.../core/job/service/PipelineJobManager.java | 4 +-
.../pipeline/core/job/type/PipelineJobOption.java | 4 +-
.../ConsistencyCheckJobTypeTest.java | 60 ++++++++++++++++++++++
3 files changed, 64 insertions(+), 4 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 8ca96badf6d..cc368f856ee 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -88,7 +88,7 @@ public final class PipelineJobManager {
}
}
startCurrentDisabledJob(jobId);
- String toBeStartDisabledNextJobType =
jobType.getOption().getGetToBeStartDisabledNextJobType();
+ String toBeStartDisabledNextJobType =
jobType.getOption().getToBeStartDisabledNextJobType();
if (null != toBeStartDisabledNextJobType) {
startNextDisabledJob(jobId, toBeStartDisabledNextJobType);
}
@@ -126,7 +126,7 @@ public final class PipelineJobManager {
* @param jobId job id
*/
public void stop(final String jobId) {
- String toBeStoppedPreviousJobType =
jobType.getOption().getGetToBeStoppedPreviousJobType();
+ String toBeStoppedPreviousJobType =
jobType.getOption().getToBeStoppedPreviousJobType();
if (null != toBeStoppedPreviousJobType) {
stopPreviousJob(jobId, toBeStoppedPreviousJobType);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobOption.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobOption.java
index 7dcf7d0517b..dde24fc09bc 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobOption.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobOption.java
@@ -42,9 +42,9 @@ public final class PipelineJobOption {
private final boolean
isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished;
- private final String getToBeStartDisabledNextJobType;
+ private final String toBeStartDisabledNextJobType;
- private final String getToBeStoppedPreviousJobType;
+ private final String toBeStoppedPreviousJobType;
private final boolean isForceNoShardingWhenConvertToJobConfigurationPOJO;
}
diff --git
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTypeTest.java
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTypeTest.java
new file mode 100644
index 00000000000..4398821bfef
--- /dev/null
+++
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTypeTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlConsistencyCheckJobItemProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.swapper.YamlConsistencyCheckJobConfigurationSwapper;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+class ConsistencyCheckJobTypeTest {
+
+ @SuppressWarnings("rawtypes")
+ private final PipelineJobType jobType =
TypedSPILoader.getService(PipelineJobType.class, "CONSISTENCY_CHECK");
+
+ @Test
+ void assertGetOption() {
+ PipelineJobOption actual = jobType.getOption();
+ assertThat(actual.getCode(), is("02"));
+ assertFalse(actual.isTransmissionJob());
+ assertThat(actual.getYamlJobConfigurationSwapper(),
isA(YamlConsistencyCheckJobConfigurationSwapper.class));
+ assertThat(actual.getYamlJobItemProgressSwapper(),
isA(YamlConsistencyCheckJobItemProgressSwapper.class));
+ assertThat(actual.getJobClass(), is(ConsistencyCheckJob.class));
+
assertTrue(actual.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished());
+ assertNull(actual.getToBeStartDisabledNextJobType());
+ assertNull(actual.getToBeStoppedPreviousJobType());
+
assertFalse(actual.isForceNoShardingWhenConvertToJobConfigurationPOJO());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ void assertGetJobObjective() {
+
assertNull(jobType.getJobObjective(mock(PipelineJobConfiguration.class)));
+ }
+}