This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 3784c7841ac Refactor AbstractSeparablePipelineJob (#32753) 3784c7841ac is described below commit 3784c7841ac0d61f45fa5b484798e827582a66b8 Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Sat Aug 31 21:49:22 2024 +0800 Refactor AbstractSeparablePipelineJob (#32753) --- .../data/pipeline/core/job/AbstractSeparablePipelineJob.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java index 63fac5ef828..7a021fdf93b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java @@ -31,7 +31,6 @@ import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItem import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils; import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; @@ -58,6 +57,7 @@ public abstract class AbstractSeparablePipelineJob<T extends PipelineJobConfigur jobRunnerManager = new PipelineJobRunnerManager(); } + @SuppressWarnings("unchecked") @Override public final void execute(final ShardingContext shardingContext) { String jobId = shardingContext.getJobName(); @@ -68,12 +68,11 @@ public abstract class AbstractSeparablePipelineJob<T extends PipelineJobConfigur return; } PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId); - PipelineJobConfigurationManager jobConfigManager = new PipelineJobConfigurationManager(jobType); PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId); - T jobConfig = jobConfigManager.getJobConfiguration(jobId); - TransmissionProcessContext jobProcessContext = createTransmissionProcessContext(jobId, jobType, contextKey); + T jobConfig = (T) jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter()); PipelineJobItemManager<P> jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()); P jobItemProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem).orElse(null); + TransmissionProcessContext jobProcessContext = createTransmissionProcessContext(jobId, jobType, contextKey); PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey); boolean started = false; try {