This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d6f4d38023d Use PipelineJobType.getOption() (#37105)
d6f4d38023d is described below

commit d6f4d38023d83e8a6d877247ac304335c5a82007
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 15 10:10:31 2025 +0800

    Use PipelineJobType.getOption() (#37105)
---
 .../job/executor/DistributedPipelineJobExecutor.java    |  6 +++---
 .../data/pipeline/core/job/id/PipelineJobIdUtils.java   |  2 +-
 .../persist/PipelineJobProgressPersistService.java      |  2 +-
 .../job/service/PipelineJobConfigurationManager.java    |  9 ++++++---
 .../pipeline/core/job/service/PipelineJobManager.java   | 17 +++++++++++------
 .../core/job/service/TransmissionJobManager.java        |  2 +-
 .../data/pipeline/core/job/type/JobCodeRegistry.java    |  4 ++--
 .../PipelineContextManagerLifecycleListener.java        |  2 +-
 .../processor/JobConfigurationChangedProcessEngine.java |  2 +-
 .../core/task/runner/TransmissionTasksRunner.java       |  2 +-
 .../apache/shardingsphere/data/pipeline/cdc/CDCJob.java |  4 ++--
 .../shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java | 11 +++++++----
 .../consistencycheck/api/ConsistencyCheckJobAPI.java    |  2 +-
 .../task/ConsistencyCheckTasksRunner.java               |  4 ++--
 .../migration/preparer/MigrationJobPreparer.java        |  2 +-
 .../ConsistencyCheckJobExecutorCallbackTest.java        |  2 +-
 .../api/ConsistencyCheckJobAPITest.java                 |  2 +-
 .../scenario/migration/api/MigrationJobAPITest.java     |  2 +-
 18 files changed, 44 insertions(+), 33 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutor.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutor.java
index a18639f6f83..807598a9817 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutor.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutor.java
@@ -70,8 +70,8 @@ public final class DistributedPipelineJobExecutor {
         }
         PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
-        PipelineJobConfiguration jobConfig = 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        PipelineJobItemManager<PipelineJobItemProgress> jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+        PipelineJobConfiguration jobConfig = 
jobType.getOption().getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+        PipelineJobItemManager<PipelineJobItemProgress> jobItemManager = new 
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
         PipelineJobItemProgress jobItemProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), 
shardingItem).orElse(null);
         TransmissionProcessContext jobProcessContext = 
createTransmissionProcessContext(jobId, jobType, contextKey);
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
@@ -112,7 +112,7 @@ public final class DistributedPipelineJobExecutor {
     }
     
     private TransmissionProcessContext createTransmissionProcessContext(final 
String jobId, final PipelineJobType jobType, final PipelineContextKey 
contextKey) {
-        if (!jobType.isTransmissionJob()) {
+        if (!jobType.getOption().isTransmissionJob()) {
             return null;
         }
         PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(new 
PipelineProcessConfigurationPersistService().load(contextKey, 
jobType.getType()));
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/id/PipelineJobIdUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/id/PipelineJobIdUtils.java
index d87b388323b..46baa0beffa 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/id/PipelineJobIdUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/id/PipelineJobIdUtils.java
@@ -56,7 +56,7 @@ public final class PipelineJobIdUtils {
         String databaseName = instanceType == InstanceType.PROXY ? "" : 
contextKey.getDatabaseName();
         String databaseNameHex = 
Hex.encodeHexString(databaseName.getBytes(StandardCharsets.UTF_8), true);
         String databaseNameLengthHex = 
Hex.encodeHexString(Shorts.toByteArray((short) databaseNameHex.length()), true);
-        return 'j' + jobType.getCode() + PipelineJobId.CURRENT_VERSION + 
instanceType.getCode() + databaseNameLengthHex + databaseNameHex;
+        return 'j' + jobType.getOption().getCode() + 
PipelineJobId.CURRENT_VERSION + instanceType.getCode() + databaseNameLengthHex 
+ databaseNameHex;
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 4e0223b89ad..809ff360d34 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -135,7 +135,7 @@ public final class PipelineJobProgressPersistService {
             }
             long startTimeMillis = System.currentTimeMillis();
             new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
-                    
PipelineJobIdUtils.parseJobType(jobId).getType()).getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
+                    
PipelineJobIdUtils.parseJobType(jobId).getType()).getOption().getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
             
persistContext.getUnhandledEventCount().addAndGet(-currentUnhandledEventCount);
             if (6 == ThreadLocalRandom.current().nextInt(100)) {
                 log.info("persist, jobId={}, shardingItem={}, cost {} ms", 
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
index e7963d54b4f..f9af4cc3f73 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.job.service;
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.core.execute.ShardingTotalCountUsageJobExecutorServiceHandler;
 import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.config.yaml.swapper.YamlPipelineJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
@@ -47,7 +48,7 @@ public final class PipelineJobConfigurationManager {
      */
     @SuppressWarnings("unchecked")
     public <T extends PipelineJobConfiguration> T getJobConfiguration(final 
String jobId) {
-        return (T) 
jobType.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
+        return (T) 
jobType.getOption().getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
     }
     
     /**
@@ -56,11 +57,13 @@ public final class PipelineJobConfigurationManager {
      * @param jobConfig pipeline job configuration
      * @return converted job configuration POJO
      */
+    @SuppressWarnings({"unchecked", "rawtypes"})
     public JobConfigurationPOJO convertToJobConfigurationPOJO(final 
PipelineJobConfiguration jobConfig) {
         JobConfigurationPOJO result = new JobConfigurationPOJO();
         result.setJobName(jobConfig.getJobId());
-        
result.setShardingTotalCount(jobType.isForceNoShardingWhenConvertToJobConfigurationPOJO()
 ? 1 : jobConfig.getJobShardingCount());
-        
result.setJobParameter(YamlEngine.marshal(jobType.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
+        
result.setShardingTotalCount(jobType.getOption().isForceNoShardingWhenConvertToJobConfigurationPOJO()
 ? 1 : jobConfig.getJobShardingCount());
+        YamlPipelineJobConfigurationSwapper swapper = 
jobType.getOption().getYamlJobConfigurationSwapper();
+        
result.setJobParameter(YamlEngine.marshal(swapper.swapToYamlConfiguration(jobConfig)));
         String createTimeFormat = 
LocalDateTime.now().format(DateTimeFormatterFactory.getDatetimeFormatter());
         result.getProps().setProperty("create_time", createTimeFormat);
         result.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 9633d062526..f0f3266d9fb 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -67,7 +67,7 @@ public final class PipelineJobManager {
             log.warn("jobId already exists in registry center, ignore, job id 
is `{}`", jobId);
             return;
         }
-        governanceFacade.getJobFacade().getJob().create(jobId, 
jobType.getJobClass());
+        governanceFacade.getJobFacade().getJob().create(jobId, 
jobType.getOption().getJobClass());
         governanceFacade.getJobFacade().getConfiguration().persist(jobId, new 
PipelineJobConfigurationManager(jobType).convertToJobConfigurationPOJO(jobConfig));
     }
     
@@ -77,16 +77,18 @@ public final class PipelineJobManager {
      * @param jobId job id
      */
     public void resume(final String jobId) {
-        if (jobType.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) 
{
-            Optional<? extends PipelineJobItemProgress> jobItemProgress = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()).getProgress(jobId,
 0);
+        if 
(jobType.getOption().isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) 
{
+            Optional<? extends PipelineJobItemProgress> jobItemProgress = new 
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper()).getProgress(jobId,
 0);
             if (jobItemProgress.isPresent() && JobStatus.FINISHED == 
jobItemProgress.get().getStatus()) {
                 log.info("job status is FINISHED, ignore, jobId={}", jobId);
                 return;
             }
         }
         startCurrentDisabledJob(jobId);
-        jobType.getToBeStartDisabledNextJobType().ifPresent(optional -> 
startNextDisabledJob(jobId, optional));
-        
+        String toBeStartDisabledNextJobType = 
jobType.getOption().getGetToBeStartDisabledNextJobType();
+        if (null != toBeStartDisabledNextJobType) {
+            startNextDisabledJob(jobId, toBeStartDisabledNextJobType);
+        }
     }
     
     private void startCurrentDisabledJob(final String jobId) {
@@ -121,7 +123,10 @@ public final class PipelineJobManager {
      * @param jobId job id
      */
     public void stop(final String jobId) {
-        jobType.getToBeStoppedPreviousJobType().ifPresent(optional -> 
stopPreviousJob(jobId, optional));
+        String toBeStoppedPreviousJobType = 
jobType.getOption().getGetToBeStoppedPreviousJobType();
+        if (null != toBeStoppedPreviousJobType) {
+            stopPreviousJob(jobId, toBeStoppedPreviousJobType);
+        }
         stopCurrentJob(jobId);
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index df81d4312cd..4e0020ea566 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -94,7 +94,7 @@ public final class TransmissionJobManager {
      * @return each sharding item progress
      */
     public Map<Integer, TransmissionJobItemProgress> getJobProgress(final 
PipelineJobConfiguration jobConfig) {
-        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new 
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
         String jobId = jobConfig.getJobId();
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         return IntStream.range(0, 
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, 
each) -> {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/JobCodeRegistry.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/JobCodeRegistry.java
index 768482d0f4e..10ed2cde6c7 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/JobCodeRegistry.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/JobCodeRegistry.java
@@ -35,8 +35,8 @@ public final class JobCodeRegistry {
     
     static {
         for (PipelineJobType each : 
ShardingSphereServiceLoader.getServiceInstances(PipelineJobType.class)) {
-            Preconditions.checkArgument(2 == each.getCode().length(), "Job 
type code length is not 2.");
-            JOB_CODE_AND_TYPE_MAP.put(each.getCode(), each);
+            Preconditions.checkArgument(2 == 
each.getOption().getCode().length(), "Job type code length is not 2.");
+            JOB_CODE_AND_TYPE_MAP.put(each.getOption().getCode(), each);
         }
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index beac261130e..609adff8fd4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -78,7 +78,7 @@ public final class PipelineContextManagerLifecycleListener 
implements ContextMan
                 log.warn("Parse job type failed, job name: {}, error: {}", 
each.getJobName(), ex.getMessage());
                 continue;
             }
-            if ("CONSISTENCY_CHECK".equals(jobType.getCode())) {
+            if ("CONSISTENCY_CHECK".equals(jobType.getType())) {
                 continue;
             }
             JobConfigurationPOJO jobConfig;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
index 666efd52db2..b956286c849 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
@@ -63,7 +63,7 @@ public final class JobConfigurationChangedProcessEngine {
                 if (PipelineJobRegistry.isExisting(jobId)) {
                     log.info("{} added to executing jobs failed since it 
already exists", jobId);
                 } else {
-                    T pipelineJobConfig = (T) 
PipelineJobIdUtils.parseJobType(jobConfig.getJobName()).getYamlJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter());
+                    T pipelineJobConfig = (T) 
PipelineJobIdUtils.parseJobType(jobConfig.getJobName()).getOption().getYamlJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter());
                     executeJob(jobConfig, pipelineJobConfig, processor);
                 }
                 break;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
index 8640db836db..87994fb0aa0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
@@ -63,7 +63,7 @@ public final class TransmissionTasksRunner implements 
PipelineTasksRunner {
         inventoryTasks = jobItemContext.getInventoryTasks();
         incrementalTasks = jobItemContext.getIncrementalTasks();
         jobType = TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
-        jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+        jobItemManager = new 
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 60df62aaf65..052d7086c43 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -103,8 +103,8 @@ public final class CDCJob implements PipelineJob {
         log.info("Execute job {}", jobId);
         PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
-        CDCJobConfiguration jobConfig = (CDCJobConfiguration) 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+        CDCJobConfiguration jobConfig = (CDCJobConfiguration) 
jobType.getOption().getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new 
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
         TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(
                 jobId, 
PipelineProcessConfigurationUtils.fillInDefaultValue(new 
PipelineProcessConfigurationPersistService().load(contextKey, 
jobType.getType())));
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index fe49f869bf0..1eb93f50767 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -50,6 +50,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlPipelineJobItemProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
@@ -132,7 +133,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         if 
(governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId()))
 {
             log.warn("CDC job already exists in registry center, ignore, job 
id is `{}`", jobConfig.getJobId());
         } else {
-            
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), 
jobType.getJobClass());
+            
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), 
jobType.getOption().getJobClass());
             JobConfigurationPOJO jobConfigPOJO = 
jobConfigManager.convertToJobConfigurationPOJO(jobConfig);
             jobConfigPOJO.setDisabled(true);
             
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(),
 jobConfigPOJO);
@@ -180,9 +181,10 @@ public final class CDCJobAPI implements TransmissionJobAPI 
{
         return new 
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
     }
     
+    @SuppressWarnings({"rawtypes", "unchecked"})
     private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
         String jobId = jobConfig.getJobId();
-        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new 
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
         try (PipelineDataSourceManager pipelineDataSourceManager = new 
PipelineDataSourceManager()) {
             for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
                 if (jobItemManager.getProgress(jobId, i).isPresent()) {
@@ -190,8 +192,9 @@ public final class CDCJobAPI implements TransmissionJobAPI {
                 }
                 IncrementalDumperContext dumperContext = 
buildDumperContext(jobConfig, i, new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
                 TransmissionJobItemProgress jobItemProgress = 
getTransmissionJobItemProgress(jobConfig, pipelineDataSourceManager, 
dumperContext);
-                
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(
-                        jobId, i, 
YamlEngine.marshal(jobType.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+                YamlPipelineJobItemProgressSwapper swapper = 
jobType.getOption().getYamlJobItemProgressSwapper();
+                PipelineAPIFactory.getPipelineGovernanceFacade(
+                        
PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(jobId,
 i, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress)));
             }
         } catch (final SQLException ex) {
             throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
diff --git 
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
index 39eec1586d1..0d99f34600f 100644
--- 
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
@@ -68,7 +68,7 @@ public final class ConsistencyCheckJobAPI {
     private final PipelineJobItemManager<ConsistencyCheckJobItemProgress> 
jobItemManager;
     
     public ConsistencyCheckJobAPI(final ConsistencyCheckJobType jobType) {
-        progressSwapper = jobType.getYamlJobItemProgressSwapper();
+        progressSwapper = (YamlConsistencyCheckJobItemProgressSwapper) 
jobType.getOption().getYamlJobItemProgressSwapper();
         jobManager = new PipelineJobManager(jobType);
         jobConfigManager = new PipelineJobConfigurationManager(jobType);
         jobItemManager = new PipelineJobItemManager<>(progressSwapper);
diff --git 
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index df6806eb0ba..50b84d63eca 100644
--- 
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -59,7 +59,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
     
     private final PipelineJobManager jobManager = new 
PipelineJobManager(jobType);
     
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
     
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
     
@@ -90,7 +90,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             return;
         }
         new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
-                
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
+                
.getOption().getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
         CompletableFuture<?> future = 
jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor);
         PipelineExecuteEngine.trigger(Collections.singletonList(future), new 
CheckExecuteCallback());
     }
diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index e84e90c7433..fe3bcf8e4a7 100644
--- 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -80,7 +80,7 @@ import java.util.Collections;
 @Slf4j
 public final class MigrationJobPreparer implements 
PipelineJobPreparer<MigrationJobItemContext> {
     
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(new 
MigrationJobType().getYamlJobItemProgressSwapper());
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(new 
MigrationJobType().getOption().getYamlJobItemProgressSwapper());
     
     @Override
     public void prepare(final MigrationJobItemContext jobItemContext) throws 
SQLException {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
index 1cf7ce12d18..6e2d2a35607 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
@@ -72,7 +72,7 @@ class ConsistencyCheckJobExecutorCallbackTest {
                 
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectedYamlTableCheckRangePositions)));
         ConsistencyCheckJobExecutorCallback callback = new 
ConsistencyCheckJobExecutorCallback();
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId));
-        PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(new 
ConsistencyCheckJobType().getYamlJobItemProgressSwapper());
+        PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(new 
ConsistencyCheckJobType().getOption().getYamlJobItemProgressSwapper());
         Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), 0);
         ConsistencyCheckJobItemContext actual = 
callback.buildJobItemContext(jobConfig, 0, jobItemProgress.orElse(null), null, 
null);
         YamlTableCheckRangePositionSwapper tableCheckPositionSwapper = new 
YamlTableCheckRangePositionSwapper();
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
index bb8ba7c779c..0cab811477d 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
@@ -59,7 +59,7 @@ class ConsistencyCheckJobAPITest {
     
     private final ConsistencyCheckJobAPI jobAPI = new 
ConsistencyCheckJobAPI(jobType);
     
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
     
     private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new 
YamlMigrationJobConfigurationSwapper();
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
index cba8da27a7c..aa807dbb132 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
@@ -116,7 +116,7 @@ class MigrationJobAPITest {
         jobConfigManager = new PipelineJobConfigurationManager(jobType);
         jobManager = new PipelineJobManager(jobType);
         transmissionJobManager = new TransmissionJobManager(jobType);
-        jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+        jobItemManager = new 
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
         String jdbcUrl = 
"jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
         databaseType = DatabaseTypeFactory.get(jdbcUrl);
         Map<String, Object> props = new HashMap<>(3, 1F);

Reply via email to