This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 2d81b4f9df9 Refactor AbstractInseparablePipelineJob (#32754) 2d81b4f9df9 is described below commit 2d81b4f9df907ca34187fce8827d981a164f6fbb Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Sat Aug 31 21:56:31 2024 +0800 Refactor AbstractInseparablePipelineJob (#32754) --- .../core/job/AbstractInseparablePipelineJob.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java index 9839cb8dc0a..3d3047bcdea 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java @@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.job; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext; @@ -68,11 +69,12 @@ public abstract class AbstractInseparablePipelineJob<T extends PipelineJobConfig String jobId = shardingContext.getJobName(); log.info("Execute job {}", jobId); PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId); + PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId); T jobConfig = (T) jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter()); - TransmissionProcessContext jobProcessContext = jobType.isTransmissionJob() ? createTransmissionProcessContext(jobId) : null; - Collection<I> jobItemContexts = new LinkedList<>(); PipelineJobItemManager<P> jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()); - PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)); + TransmissionProcessContext jobProcessContext = createTransmissionProcessContext(jobId, jobType, contextKey); + PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey); + Collection<I> jobItemContexts = new LinkedList<>(); for (int shardingItem = 0; shardingItem < jobConfig.getJobShardingCount(); shardingItem++) { if (jobRunnerManager.isStopping()) { log.info("Job is stopping, ignore."); @@ -96,9 +98,11 @@ public abstract class AbstractInseparablePipelineJob<T extends PipelineJobConfig executeIncrementalTasks(jobItemContexts, jobItemManager); } - private TransmissionProcessContext createTransmissionProcessContext(final String jobId) { - PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue( - new PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId), PipelineJobIdUtils.parseJobType(jobId).getType())); + private TransmissionProcessContext createTransmissionProcessContext(final String jobId, final PipelineJobType jobType, final PipelineContextKey contextKey) { + if (!jobType.isTransmissionJob()) { + return null; + } + PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.fillInDefaultValue(new PipelineProcessConfigurationPersistService().load(contextKey, jobType.getType())); return new TransmissionProcessContext(jobId, processConfig); }