This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 40e396d45aa Remove SPI from PipelineJobOption (#29206)
40e396d45aa is described below

commit 40e396d45aa1d4fc2952e6f359e9c1dfdc9e613f
Author: Liang Zhang <zhangli...@apache.org>
AuthorDate: Sat Nov 25 20:50:36 2023 +0800

    Remove SPI from PipelineJobOption (#29206)
    
    * Refactor PipelineJobConfigurationManager
    
    * Refactor MigrationJobAPI
    
    * Remove SPI from PipelineJobOption
    
    * Remove SPI from PipelineJobOption
---
 .../data/pipeline/common/job/PipelineJobId.java        |  4 ++--
 .../data/pipeline/common/job/type/JobCodeRegistry.java |  6 +++---
 .../job/type/{JobType.java => PipelineJobType.java}    | 12 ++++++++++--
 .../impl/AbstractJobConfigurationChangedProcessor.java |  4 ++--
 .../data/pipeline/core/job/AbstractPipelineJob.java    |  3 ++-
 .../data/pipeline/core/job/AbstractPipelineJobId.java  |  4 ++--
 .../data/pipeline/core/job/PipelineJobIdUtils.java     |  4 ++--
 .../pipeline/core/job/option/PipelineJobOption.java    |  9 ++++++---
 .../persist/PipelineJobProgressPersistService.java     |  4 ++--
 .../job/service/PipelineJobConfigurationManager.java   |  3 +--
 .../pipeline/core/job/service/PipelineJobManager.java  |  5 +++--
 .../core/task/runner/TransmissionTasksRunner.java      |  5 +++--
 .../data/pipeline/common/job/type/FixtureJobType.java  | 11 ++++++++++-
 ...here.data.pipeline.common.job.type.PipelineJobType} |  0
 .../handler/query/ShowStreamingJobStatusExecutor.java  |  4 ++--
 .../handler/query/ShowStreamingRuleExecutor.java       |  4 ++--
 .../handler/query/ShowMigrationJobStatusExecutor.java  |  4 ++--
 .../data/pipeline/cdc/api/job/type/CDCJobType.java     | 11 +++++++++--
 ...here.data.pipeline.common.job.type.PipelineJobType} |  0
 ...ere.data.pipeline.core.job.option.PipelineJobOption | 18 ------------------
 .../data/pipeline/cdc/core/job/CDCJobIdTest.java       |  4 ++--
 .../consistencycheck/ConsistencyCheckJobType.java      | 11 +++++++++--
 ...nsistencyCheckJobConfigurationChangedProcessor.java |  4 ++--
 .../task/ConsistencyCheckTasksRunner.java              |  9 ++++-----
 ...here.data.pipeline.common.job.type.PipelineJobType} |  0
 ...ere.data.pipeline.core.job.option.PipelineJobOption | 18 ------------------
 .../pipeline/scenario/migration/MigrationJobType.java  | 11 +++++++++--
 .../scenario/migration/api/impl/MigrationJobAPI.java   | 12 +++++++-----
 .../MigrationJobConfigurationChangedProcessor.java     |  4 ++--
 ...here.data.pipeline.common.job.type.PipelineJobType} |  0
 ...ere.data.pipeline.core.job.option.PipelineJobOption | 18 ------------------
 .../ral/queryable/ShowMigrationRuleExecutor.java       |  4 ++--
 .../ral/updatable/AlterTransmissionRuleUpdater.java    |  4 ++--
 .../data/pipeline/cases/PipelineContainerComposer.java |  8 ++++----
 .../pipeline/core/util/JobConfigurationBuilder.java    |  4 ++--
 35 files changed, 108 insertions(+), 118 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
index 88ed08a4922..9f8039a30c1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.common.job;
 
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 
 /**
  * Pipeline job id.
@@ -30,7 +30,7 @@ public interface PipelineJobId {
      *
      * @return type
      */
-    JobType getJobType();
+    PipelineJobType getJobType();
     
     /**
      * Get format version.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java
index cdd230ebf03..d7fce319ffa 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java
@@ -33,10 +33,10 @@ import java.util.Map;
 @Slf4j
 public final class JobCodeRegistry {
     
-    private static final Map<String, JobType> JOB_CODE_AND_TYPE_MAP = new 
HashMap<>();
+    private static final Map<String, PipelineJobType> JOB_CODE_AND_TYPE_MAP = 
new HashMap<>();
     
     static {
-        for (JobType each : 
ShardingSphereServiceLoader.getServiceInstances(JobType.class)) {
+        for (PipelineJobType each : 
ShardingSphereServiceLoader.getServiceInstances(PipelineJobType.class)) {
             Preconditions.checkArgument(2 == each.getCode().length(), "Job 
type code length is not 2.");
             JOB_CODE_AND_TYPE_MAP.put(each.getCode(), each);
         }
@@ -48,7 +48,7 @@ public final class JobCodeRegistry {
      * @param jobTypeCode job type code
      * @return job type
      */
-    public static JobType getJobType(final String jobTypeCode) {
+    public static PipelineJobType getJobType(final String jobTypeCode) {
         
Preconditions.checkArgument(JOB_CODE_AND_TYPE_MAP.containsKey(jobTypeCode), 
"Can not get job type by `%s`.", jobTypeCode);
         return JOB_CODE_AND_TYPE_MAP.get(jobTypeCode);
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java
similarity index 81%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java
index bb599664c64..4781e9c4552 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java
@@ -17,14 +17,15 @@
 
 package org.apache.shardingsphere.data.pipeline.common.job.type;
 
+import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
 
 /**
- * Job type.
+ * Pipeline job type.
  */
 @SingletonSPI
-public interface JobType extends TypedSPI {
+public interface PipelineJobType extends TypedSPI {
     
     /**
      * Get job type code.
@@ -33,6 +34,13 @@ public interface JobType extends TypedSPI {
      */
     String getCode();
     
+    /**
+     * Get job option.
+     *
+     * @return job option
+     */
+    PipelineJobOption getOption();
+    
     @Override
     String getType();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
index 53c1af69305..b331b378543 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
@@ -18,7 +18,7 @@
 package 
org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.impl;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.JobConfigurationChangedProcessor;
 import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
@@ -89,7 +89,7 @@ public abstract class 
AbstractJobConfigurationChangedProcessor implements JobCon
     
     protected abstract AbstractPipelineJob buildPipelineJob(String jobId);
     
-    protected abstract JobType getJobType();
+    protected abstract PipelineJobType getJobType();
     
     @Override
     public String getType() {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 78eecd175d1..8cf5e7ea051 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -22,6 +22,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
@@ -65,7 +66,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     
     protected AbstractPipelineJob(final String jobId) {
         this.jobId = jobId;
-        jobOption = TypedSPILoader.getService(PipelineJobOption.class, 
PipelineJobIdUtils.parseJobType(jobId).getType());
+        jobOption = TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobId).getType()).getOption();
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
index 9a8834dd831..17ffc1d82db 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
@@ -22,7 +22,7 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 
 /**
  * Abstract pipeline job id.
@@ -33,7 +33,7 @@ public abstract class AbstractPipelineJobId implements 
PipelineJobId {
     
     public static final String CURRENT_VERSION = "02";
     
-    private final JobType jobType;
+    private final PipelineJobType jobType;
     
     private final PipelineContextKey contextKey;
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index 5611ac02e1c..65a825e0497 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -27,7 +27,7 @@ import org.apache.commons.codec.binary.Hex;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import org.apache.shardingsphere.data.pipeline.common.util.InstanceTypeUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
@@ -64,7 +64,7 @@ public final class PipelineJobIdUtils {
      * @param jobId job id
      * @return job type
      */
-    public static JobType parseJobType(final String jobId) {
+    public static PipelineJobType parseJobType(final String jobId) {
         verifyJobId(jobId);
         return JobCodeRegistry.getJobType(jobId.substring(1, 3));
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
index 55f0c235f1d..ce65bf34c57 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
@@ -24,7 +24,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConf
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
 import java.util.Optional;
@@ -33,7 +32,7 @@ import java.util.Optional;
  * Pipeline job option.
  */
 @SingletonSPI
-public interface PipelineJobOption extends TypedSPI {
+public interface PipelineJobOption {
     
     /**
      * Get YAML pipeline job configuration swapper.
@@ -95,6 +94,10 @@ public interface PipelineJobOption extends TypedSPI {
      */
     Class<? extends PipelineJob> getJobClass();
     
-    @Override
+    /**
+     * Get job type.
+     * 
+     * @return job type
+     */
     String getType();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 0f7bc16a222..26c95849c91 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -21,9 +21,9 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -130,7 +130,7 @@ public final class PipelineJobProgressPersistService {
             }
             persistContext.getHasNewEvents().set(false);
             long startTimeMillis = System.currentTimeMillis();
-            new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class, 
PipelineJobIdUtils.parseJobType(jobId).getType())
+            new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobId).getType()).getOption()
                     
.getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
             persistContext.getBeforePersistingProgressMillis().set(null);
             if (6 == ThreadLocalRandom.current().nextInt(100)) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
index c02dc1355a8..a6c49fcb965 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
@@ -58,8 +58,7 @@ public final class PipelineJobConfigurationManager {
     public JobConfigurationPOJO convertToJobConfigurationPOJO(final 
PipelineJobConfiguration jobConfig) {
         JobConfigurationPOJO result = new JobConfigurationPOJO();
         result.setJobName(jobConfig.getJobId());
-        int shardingTotalCount = 
jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : 
jobConfig.getJobShardingCount();
-        result.setShardingTotalCount(shardingTotalCount);
+        
result.setShardingTotalCount(jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO()
 ? 1 : jobConfig.getJobShardingCount());
         
result.setJobParameter(YamlEngine.marshal(jobOption.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
         String createTimeFormat = 
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter());
         result.getProps().setProperty("create_time", createTimeFormat);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 717125ea114..27540a94a9d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConf
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
@@ -109,7 +110,7 @@ public final class PipelineJobManager {
     private void startNextDisabledJob(final String jobId, final String 
toBeStartDisabledNextJobType) {
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional
 -> {
             try {
-                new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, 
toBeStartDisabledNextJobType)).startDisabledJob(optional);
+                new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, 
toBeStartDisabledNextJobType).getOption()).startDisabledJob(optional);
                 // CHECKSTYLE:OFF
             } catch (final RuntimeException ex) {
                 // CHECKSTYLE:ON
@@ -131,7 +132,7 @@ public final class PipelineJobManager {
     private void stopPreviousJob(final String jobId, final String 
toBeStoppedPreviousJobType) {
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional
 -> {
             try {
-                new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, 
toBeStoppedPreviousJobType)).stop(optional);
+                new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, 
toBeStoppedPreviousJobType).getOption()).stop(optional);
                 // CHECKSTYLE:OFF
             } catch (final RuntimeException ex) {
                 // CHECKSTYLE:ON
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
index 6291279a019..655dbbb85d3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
@@ -66,7 +67,7 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
         this.jobItemContext = jobItemContext;
         inventoryTasks = jobItemContext.getInventoryTasks();
         incrementalTasks = jobItemContext.getIncrementalTasks();
-        jobOption = TypedSPILoader.getService(PipelineJobOption.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
+        jobOption = TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption();
         jobManager = new PipelineJobManager(jobOption);
         jobItemManager = new 
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
     }
@@ -89,7 +90,7 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
         if (jobItemContext.isStopping()) {
             return;
         }
-        new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
+        new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption()
                 
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
         if 
(PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
             log.info("All inventory tasks finished.");
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
index a1976f940ab..d4de294616f 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
@@ -17,16 +17,25 @@
 
 package org.apache.shardingsphere.data.pipeline.common.job.type;
 
+import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+
+import static org.mockito.Mockito.mock;
+
 /**
  * Fixture job type.
  */
-public final class FixtureJobType implements JobType {
+public final class FixtureJobType implements PipelineJobType {
     
     @Override
     public String getCode() {
         return "00";
     }
     
+    @Override
+    public PipelineJobOption getOption() {
+        return mock(PipelineJobOption.class);
+    }
+    
     @Override
     public String getType() {
         return "FIXTURE";
diff --git 
a/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
 
b/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
similarity index 100%
rename from 
kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
rename to 
kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index add405c928f..b385437e6ca 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -20,8 +20,8 @@ package org.apache.shardingsphere.cdc.distsql.handler.query;
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
 import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -41,7 +41,7 @@ public final class ShowStreamingJobStatusExecutor implements 
QueryableRALExecuto
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingStatusStatement sqlStatement) {
-        TransmissionJobOption jobOption = (TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobOption.class, new CDCJobType().getType());
+        TransmissionJobOption jobOption = (TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobType.class, new 
CDCJobType().getType()).getOption();
         List<TransmissionJobItemInfo> jobItemInfos = new 
TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
         long currentTimeMillis = System.currentTimeMillis();
         return jobItemInfos.stream().map(each -> generateResultRow(each, 
currentTimeMillis)).collect(Collectors.toList());
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
index 535299f0da2..843bb10a8e8 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.cdc.distsql.handler.query;
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -40,7 +40,7 @@ public final class ShowStreamingRuleExecutor implements 
QueryableRALExecutor<Sho
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingRuleStatement sqlStatement) {
-        PipelineProcessConfiguration processConfig = new 
TransmissionJobManager((TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobOption.class, "STREAMING"))
+        PipelineProcessConfiguration processConfig = new 
TransmissionJobManager((TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobType.class, "STREAMING").getOption())
                 .showProcessConfiguration(new 
PipelineContextKey(InstanceType.PROXY));
         Collection<LocalDataQueryResultRow> result = new LinkedList<>();
         result.add(new 
LocalDataQueryResultRow(getString(processConfig.getRead()), 
getString(processConfig.getWrite()), 
getString(processConfig.getStreamChannel())));
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
index bb146b4856a..b6a597a82fb 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
@@ -18,8 +18,8 @@
 package org.apache.shardingsphere.migration.distsql.handler.query;
 
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -40,7 +40,7 @@ public final class ShowMigrationJobStatusExecutor implements 
QueryableRALExecuto
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationStatusStatement sqlStatement) {
-        TransmissionJobOption jobOption = (TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobOption.class, "MIGRATION");
+        TransmissionJobOption jobOption = (TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobType.class, "MIGRATION").getOption();
         List<TransmissionJobItemInfo> jobItemInfos = new 
TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
         long currentTimeMillis = System.currentTimeMillis();
         return jobItemInfos.stream().map(each -> generateResultRow(each, 
currentTimeMillis)).collect(Collectors.toList());
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/job/type/CDCJobType.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/job/type/CDCJobType.java
index 95cfcbdb538..c073f45c896 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/job/type/CDCJobType.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/job/type/CDCJobType.java
@@ -17,18 +17,25 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.api.job.type;
 
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
+import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 
 /**
  * CDC job type.
  */
-public final class CDCJobType implements JobType {
+public final class CDCJobType implements PipelineJobType {
     
     @Override
     public String getCode() {
         return "03";
     }
     
+    @Override
+    public PipelineJobOption getOption() {
+        return new CDCJobOption();
+    }
+    
     @Override
     public String getType() {
         return "STREAMING";
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
similarity index 100%
rename from 
kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
rename to 
kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
deleted file mode 100644
index 136bec61f32..00000000000
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
index 70856a72c1c..cb9a8b6eeb0 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.job;
 import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import org.junit.jupiter.api.Test;
@@ -37,7 +37,7 @@ class CDCJobIdTest {
         PipelineContextKey contextKey = new PipelineContextKey("sharding_db", 
InstanceType.PROXY);
         CDCJobId pipelineJobId = new CDCJobId(contextKey, 
Arrays.asList("test", "t_order"), false, CDCSinkType.SOCKET.name());
         String jobId = 
PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
-        JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
+        PipelineJobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
         assertThat(actualJobType, instanceOf(CDCJobType.class));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
index 87e71d934de..ccc0dc3ad8d 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
@@ -17,18 +17,25 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
+import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobOption;
 
 /**
  * Consistency check job type.
  */
-public final class ConsistencyCheckJobType implements JobType {
+public final class ConsistencyCheckJobType implements PipelineJobType {
     
     @Override
     public String getCode() {
         return "02";
     }
     
+    @Override
+    public PipelineJobOption getOption() {
+        return new ConsistencyCheckJobOption();
+    }
+    
     @Override
     public String getType() {
         return "CONSISTENCY_CHECK";
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
index 89b4a40fc3a..7d90fee414b 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
@@ -18,7 +18,7 @@
 package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.impl.AbstractJobConfigurationChangedProcessor;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
@@ -41,7 +41,7 @@ public final class 
ConsistencyCheckJobConfigurationChangedProcessor extends Abst
     }
     
     @Override
-    protected JobType getJobType() {
+    protected PipelineJobType getJobType() {
         return new ConsistencyCheckJobType();
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index e5dc6c5ccc3..65fe73c5eae 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -26,11 +26,10 @@ import 
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.execute.PipelineLifecycleRunnable;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
@@ -85,7 +84,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         if (jobItemContext.isStopping()) {
             return;
         }
-        new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
+        new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption()
                 
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
         CompletableFuture<?> future = 
jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor);
         ExecuteEngine.trigger(Collections.singletonList(future), new 
CheckExecuteCallback());
@@ -102,8 +101,8 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         @Override
         protected void runBlocking() {
             jobItemManager.persistProgress(jobItemContext);
-            JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
-            TransmissionJobOption jobOption = (TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobOption.class, jobType.getType());
+            PipelineJobType jobType = 
PipelineJobIdUtils.parseJobType(parentJobId);
+            TransmissionJobOption jobOption = (TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
             PipelineJobConfiguration parentJobConfig = new 
PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
             try {
                 PipelineDataConsistencyChecker checker = 
jobOption.buildDataConsistencyChecker(
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
similarity index 100%
rename from 
kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
rename to 
kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
deleted file mode 100644
index 31edf286e07..00000000000
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobOption
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index ab7c82ebd90..5afb2f50f7a 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -17,18 +17,25 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
+import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
 
 /**
  * Migration job type.
  */
-public final class MigrationJobType implements JobType {
+public final class MigrationJobType implements PipelineJobType {
     
     @Override
     public String getCode() {
         return "01";
     }
     
+    @Override
+    public PipelineJobOption getOption() {
+        return new MigrationJobOption();
+    }
+    
     @Override
     public String getType() {
         return "MIGRATION";
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 649e0c994fa..b89e63824d3 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -49,10 +50,11 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
     public void commit(final String jobId) {
         log.info("Commit job {}", jobId);
         final long startTimeMillis = System.currentTimeMillis();
-        PipelineJobManager jobManager = new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, 
getType()));
+        PipelineJobOption jobOption = new MigrationJobOption();
+        PipelineJobManager jobManager = new PipelineJobManager(jobOption);
         jobManager.stop(jobId);
         dropCheckJobs(jobId);
-        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobOption.class,
 getType())).getJobConfiguration(jobId);
+        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId);
         refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
         jobManager.drop(jobId);
         log.info("Commit cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
@@ -70,7 +72,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
         final long startTimeMillis = System.currentTimeMillis();
         dropCheckJobs(jobId);
         cleanTempTableOnRollback(jobId);
-        new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, 
getType())).drop(jobId);
+        new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, 
getType()).getOption()).drop(jobId);
         log.info("Rollback job {} cost {} ms", jobId, 
System.currentTimeMillis() - startTimeMillis);
     }
     
@@ -81,7 +83,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
         }
         for (String each : checkJobIds) {
             try {
-                new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, 
getType())).drop(each);
+                new 
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, 
getType()).getOption()).drop(each);
                 // CHECKSTYLE:OFF
             } catch (final RuntimeException ex) {
                 // CHECKSTYLE:ON
@@ -91,7 +93,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
     }
     
     private void cleanTempTableOnRollback(final String jobId) throws 
SQLException {
-        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobOption.class,
 getType())).getJobConfiguration(jobId);
+        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class,
 getType()).getOption()).getJobConfiguration(jobId);
         PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
         TableAndSchemaNameMapper mapping = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
         try (
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
index a15921a9adb..fd2c2dc48b6 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
@@ -18,7 +18,7 @@
 package 
org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.impl.AbstractJobConfigurationChangedProcessor;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
@@ -44,7 +44,7 @@ public final class MigrationJobConfigurationChangedProcessor 
extends AbstractJob
     }
     
     @Override
-    protected JobType getJobType() {
+    protected PipelineJobType getJobType() {
         return new MigrationJobType();
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
 
b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
similarity index 100%
rename from 
kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
rename to 
kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
 
b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
deleted file mode 100644
index c89d6bd11b8..00000000000
--- 
a/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
index 1bf9b0046ed..2e6cf74c7c1 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.queryable;
 
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -40,7 +40,7 @@ public final class ShowMigrationRuleExecutor implements 
QueryableRALExecutor<Sho
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationRuleStatement sqlStatement) {
-        PipelineProcessConfiguration processConfig = new 
TransmissionJobManager((TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobOption.class, "MIGRATION"))
+        PipelineProcessConfiguration processConfig = new 
TransmissionJobManager((TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobType.class, "MIGRATION").getOption())
                 .showProcessConfiguration(new 
PipelineContextKey(InstanceType.PROXY));
         Collection<LocalDataQueryResultRow> result = new LinkedList<>();
         result.add(new 
LocalDataQueryResultRow(getString(processConfig.getRead()), 
getString(processConfig.getWrite()), 
getString(processConfig.getStreamChannel())));
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
index 210da1a21d6..97c9b0dd3a2 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
 
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
@@ -35,7 +35,7 @@ public final class AlterTransmissionRuleUpdater implements 
RALUpdater<AlterTrans
     
     @Override
     public void executeUpdate(final String databaseName, final 
AlterTransmissionRuleStatement sqlStatement) {
-        TransmissionJobManager jobManager = new 
TransmissionJobManager((TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobOption.class, 
sqlStatement.getJobTypeName()));
+        TransmissionJobManager jobManager = new 
TransmissionJobManager((TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobType.class, 
sqlStatement.getJobTypeName()).getOption());
         PipelineProcessConfiguration processConfig = 
TransmissionProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
         jobManager.alterProcessConfiguration(new 
PipelineContextKey(InstanceType.PROXY), processConfig);
     }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index edd99c6e21f..9c56035dc9d 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -26,7 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDa
 import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrlAppender;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
@@ -118,7 +118,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     private Thread increaseTaskThread;
     
-    public PipelineContainerComposer(final PipelineTestParameter testParam, 
final JobType jobType) {
+    public PipelineContainerComposer(final PipelineTestParameter testParam, 
final PipelineJobType jobType) {
         databaseType = testParam.getDatabaseType();
         containerComposer = 
PipelineE2EEnvironment.getInstance().getItEnvType() == 
PipelineEnvTypeEnum.DOCKER
                 ? new DockerContainerComposer(testParam.getDatabaseType(), 
testParam.getStorageContainerImage(), testParam.getStorageContainerCount())
@@ -140,7 +140,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     }
     
     @SneakyThrows(SQLException.class)
-    private void init(final JobType jobType) {
+    private void init(final PipelineJobType jobType) {
         String jdbcUrl = containerComposer.getProxyJdbcUrl(databaseType 
instanceof PostgreSQLDatabaseType || databaseType instanceof 
OpenGaussDatabaseType ? "postgres" : "");
         try (Connection connection = DriverManager.getConnection(jdbcUrl, 
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
             cleanUpPipelineJobs(connection, jobType);
@@ -150,7 +150,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         cleanUpDataSource();
     }
     
-    private void cleanUpPipelineJobs(final Connection connection, final 
JobType jobType) throws SQLException {
+    private void cleanUpPipelineJobs(final Connection connection, final 
PipelineJobType jobType) throws SQLException {
         if (PipelineEnvTypeEnum.NATIVE != 
PipelineE2EEnvironment.getInstance().getItEnvType()) {
             return;
         }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
index eb19714abaf..84054e5e48e 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -27,7 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -96,7 +96,7 @@ public final class JobConfigurationBuilder {
         result.setSources(sources);
         result.setTarget(createYamlPipelineDataSourceConfiguration(new 
ShardingSpherePipelineDataSourceConfiguration(
                 
ConfigurationFileUtils.readFile("migration_sharding_sphere_jdbc_target.yaml").replace("${databaseNameSuffix}",
 databaseNameSuffix))));
-        ((TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobOption.class, 
"MIGRATION")).extendYamlJobConfiguration(contextKey, result);
+        ((TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobType.class, 
"MIGRATION").getOption()).extendYamlJobConfiguration(contextKey, result);
         return result;
     }
     

Reply via email to