This is an automated email from the ASF dual-hosted git repository. totalo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 652865fd059 Refactor AbstractJobConfigurationChangedProcessor (#29352) 652865fd059 is described below commit 652865fd0593de57d0af4e6639e4fde4b3eebcc0 Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Mon Dec 11 03:20:20 2023 +0800 Refactor AbstractJobConfigurationChangedProcessor (#29352) --- .../core/job/AbstractInseparablePipelineJob.java | 22 +++++++++++----------- .../AbstractJobConfigurationChangedProcessor.java | 4 ++-- .../shardingsphere/data/pipeline/cdc/CDCJob.java | 11 ++--------- ...tencyCheckJobConfigurationChangedProcessor.java | 2 +- .../MigrationJobConfigurationChangedProcessor.java | 2 +- 5 files changed, 17 insertions(+), 24 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java index 09af3e3d7a5..64c5f3b12bf 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java @@ -22,6 +22,8 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext; +import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback; +import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine; import org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration; @@ -121,15 +123,7 @@ public abstract class AbstractInseparablePipelineJob<T extends PipelineJobItemCo if (futures.isEmpty()) { return; } - executeInventoryTasks(futures, jobItemContexts); - } - - protected abstract void executeInventoryTasks(Collection<CompletableFuture<?>> futures, Collection<T> jobItemContexts); - - private void updateJobItemStatus(final T jobItemContext, final PipelineJobType jobType, final JobStatus jobStatus) { - jobItemContext.setStatus(jobStatus); - PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()); - jobItemManager.updateStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus); + ExecuteEngine.trigger(futures, buildExecuteCallback("inventory", jobItemContexts.iterator().next())); } private void executeIncrementalTasks(final PipelineJobType jobType, final Collection<T> jobItemContexts) { @@ -147,8 +141,14 @@ public abstract class AbstractInseparablePipelineJob<T extends PipelineJobItemCo futures.addAll(task.start()); } } - executeIncrementalTasks(futures, jobItemContexts); + ExecuteEngine.trigger(futures, buildExecuteCallback("incremental", jobItemContexts.iterator().next())); + } + + private void updateJobItemStatus(final T jobItemContext, final PipelineJobType jobType, final JobStatus jobStatus) { + jobItemContext.setStatus(jobStatus); + PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()); + jobItemManager.updateStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus); } - protected abstract void executeIncrementalTasks(Collection<CompletableFuture<?>> futures, Collection<T> jobItemContexts); + protected abstract ExecuteCallback buildExecuteCallback(String identifier, T jobItemContext); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java index 38c7b9cfad1..a59a9af680d 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java @@ -79,15 +79,15 @@ public abstract class AbstractJobConfigurationChangedProcessor implements JobCon protected abstract void onDeleted(JobConfiguration jobConfig); protected void executeJob(final JobConfiguration jobConfig) { + PipelineJob job = buildJob(); String jobId = jobConfig.getJobName(); - PipelineJob job = buildPipelineJob(jobId); PipelineJobRegistry.add(jobId, job); OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfig); job.getJobRunnerManager().setJobBootstrap(oneOffJobBootstrap); oneOffJobBootstrap.execute(); } - protected abstract PipelineJob buildPipelineJob(String jobId); + protected abstract PipelineJob buildJob(); protected abstract PipelineJobType getJobType(); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java index eb649068faf..a9aa32f48ef 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java @@ -36,7 +36,6 @@ import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback; -import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine; import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext; @@ -64,7 +63,6 @@ import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; /** @@ -145,13 +143,8 @@ public final class CDCJob extends AbstractInseparablePipelineJob<CDCJobItemConte } @Override - protected void executeInventoryTasks(final Collection<CompletableFuture<?>> futures, final Collection<CDCJobItemContext> jobItemContexts) { - ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory", jobItemContexts.iterator().next())); - } - - @Override - protected void executeIncrementalTasks(final Collection<CompletableFuture<?>> futures, final Collection<CDCJobItemContext> jobItemContexts) { - ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental", jobItemContexts.iterator().next())); + protected ExecuteCallback buildExecuteCallback(final String identifier, final CDCJobItemContext jobItemContext) { + return new CDCExecuteCallback(identifier, jobItemContext); } @RequiredArgsConstructor diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java index e57d0066a76..55976be8446 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java @@ -36,7 +36,7 @@ public final class ConsistencyCheckJobConfigurationChangedProcessor extends Abst } @Override - protected PipelineJob buildPipelineJob(final String jobId) { + protected PipelineJob buildJob() { return new ConsistencyCheckJob(); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java index f9e57077ac5..18c3adc8666 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java @@ -39,7 +39,7 @@ public final class MigrationJobConfigurationChangedProcessor extends AbstractJob } @Override - protected PipelineJob buildPipelineJob(final String jobId) { + protected PipelineJob buildJob() { return new MigrationJob(); }