This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d81b4f9df9 Refactor AbstractInseparablePipelineJob (#32754)
2d81b4f9df9 is described below

commit 2d81b4f9df907ca34187fce8827d981a164f6fbb
Author: Liang Zhang <zhangli...@apache.org>
AuthorDate: Sat Aug 31 21:56:31 2024 +0800

    Refactor AbstractInseparablePipelineJob (#32754)
---
 .../core/job/AbstractInseparablePipelineJob.java         | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 9839cb8dc0a..3d3047bcdea 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
@@ -68,11 +69,12 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
         String jobId = shardingContext.getJobName();
         log.info("Execute job {}", jobId);
         PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
+        PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
         T jobConfig = (T) 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        TransmissionProcessContext jobProcessContext = 
jobType.isTransmissionJob() ? createTransmissionProcessContext(jobId) : null;
-        Collection<I> jobItemContexts = new LinkedList<>();
         PipelineJobItemManager<P> jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
-        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
+        TransmissionProcessContext jobProcessContext = 
createTransmissionProcessContext(jobId, jobType, contextKey);
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
+        Collection<I> jobItemContexts = new LinkedList<>();
         for (int shardingItem = 0; shardingItem < 
jobConfig.getJobShardingCount(); shardingItem++) {
             if (jobRunnerManager.isStopping()) {
                 log.info("Job is stopping, ignore.");
@@ -96,9 +98,11 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
         executeIncrementalTasks(jobItemContexts, jobItemManager);
     }
     
-    private TransmissionProcessContext createTransmissionProcessContext(final 
String jobId) {
-        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(
-                new 
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
 PipelineJobIdUtils.parseJobType(jobId).getType()));
+    private TransmissionProcessContext createTransmissionProcessContext(final 
String jobId, final PipelineJobType jobType, final PipelineContextKey 
contextKey) {
+        if (!jobType.isTransmissionJob()) {
+            return null;
+        }
+        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(new 
PipelineProcessConfigurationPersistService().load(contextKey, 
jobType.getType()));
         return new TransmissionProcessContext(jobId, processConfig);
     }
     

Reply via email to