This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3784c7841ac Refactor AbstractSeparablePipelineJob (#32753)
3784c7841ac is described below

commit 3784c7841ac0d61f45fa5b484798e827582a66b8
Author: Liang Zhang <zhangli...@apache.org>
AuthorDate: Sat Aug 31 21:49:22 2024 +0800

    Refactor AbstractSeparablePipelineJob (#32753)
---
 .../data/pipeline/core/job/AbstractSeparablePipelineJob.java       | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 63fac5ef828..7a021fdf93b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -31,7 +31,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItem
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
@@ -58,6 +57,7 @@ public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfigur
         jobRunnerManager = new PipelineJobRunnerManager();
     }
     
+    @SuppressWarnings("unchecked")
     @Override
     public final void execute(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
@@ -68,12 +68,11 @@ public abstract class AbstractSeparablePipelineJob<T 
extends PipelineJobConfigur
             return;
         }
         PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
-        PipelineJobConfigurationManager jobConfigManager = new 
PipelineJobConfigurationManager(jobType);
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
-        T jobConfig = jobConfigManager.getJobConfiguration(jobId);
-        TransmissionProcessContext jobProcessContext = 
createTransmissionProcessContext(jobId, jobType, contextKey);
+        T jobConfig = (T) 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         PipelineJobItemManager<P> jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
         P jobItemProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), 
shardingItem).orElse(null);
+        TransmissionProcessContext jobProcessContext = 
createTransmissionProcessContext(jobId, jobType, contextKey);
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
         boolean started = false;
         try {

Reply via email to