This is an automated email from the ASF dual-hosted git repository.

totalo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 652865fd059 Refactor AbstractJobConfigurationChangedProcessor (#29352)
652865fd059 is described below

commit 652865fd0593de57d0af4e6639e4fde4b3eebcc0
Author: Liang Zhang <zhangli...@apache.org>
AuthorDate: Mon Dec 11 03:20:20 2023 +0800

    Refactor AbstractJobConfigurationChangedProcessor (#29352)
---
 .../core/job/AbstractInseparablePipelineJob.java   | 22 +++++++++++-----------
 .../AbstractJobConfigurationChangedProcessor.java  |  4 ++--
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   | 11 ++---------
 ...tencyCheckJobConfigurationChangedProcessor.java |  2 +-
 .../MigrationJobConfigurationChangedProcessor.java |  2 +-
 5 files changed, 17 insertions(+), 24 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 09af3e3d7a5..64c5f3b12bf 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -22,6 +22,8 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
@@ -121,15 +123,7 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobItemCo
         if (futures.isEmpty()) {
             return;
         }
-        executeInventoryTasks(futures, jobItemContexts);
-    }
-    
-    protected abstract void 
executeInventoryTasks(Collection<CompletableFuture<?>> futures, Collection<T> 
jobItemContexts);
-    
-    private void updateJobItemStatus(final T jobItemContext, final 
PipelineJobType jobType, final JobStatus jobStatus) {
-        jobItemContext.setStatus(jobStatus);
-        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
-        jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
+        ExecuteEngine.trigger(futures, buildExecuteCallback("inventory", 
jobItemContexts.iterator().next()));
     }
     
     private void executeIncrementalTasks(final PipelineJobType jobType, final 
Collection<T> jobItemContexts) {
@@ -147,8 +141,14 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobItemCo
                 futures.addAll(task.start());
             }
         }
-        executeIncrementalTasks(futures, jobItemContexts);
+        ExecuteEngine.trigger(futures, buildExecuteCallback("incremental", 
jobItemContexts.iterator().next()));
+    }
+    
+    private void updateJobItemStatus(final T jobItemContext, final 
PipelineJobType jobType, final JobStatus jobStatus) {
+        jobItemContext.setStatus(jobStatus);
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+        jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
     }
     
-    protected abstract void 
executeIncrementalTasks(Collection<CompletableFuture<?>> futures, Collection<T> 
jobItemContexts);
+    protected abstract ExecuteCallback buildExecuteCallback(String identifier, 
T jobItemContext);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
index 38c7b9cfad1..a59a9af680d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
@@ -79,15 +79,15 @@ public abstract class 
AbstractJobConfigurationChangedProcessor implements JobCon
     protected abstract void onDeleted(JobConfiguration jobConfig);
     
     protected void executeJob(final JobConfiguration jobConfig) {
+        PipelineJob job = buildJob();
         String jobId = jobConfig.getJobName();
-        PipelineJob job = buildPipelineJob(jobId);
         PipelineJobRegistry.add(jobId, job);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
 job, jobConfig);
         job.getJobRunnerManager().setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
     }
     
-    protected abstract PipelineJob buildPipelineJob(String jobId);
+    protected abstract PipelineJob buildJob();
     
     protected abstract PipelineJobType getJobType();
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index eb649068faf..a9aa32f48ef 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -36,7 +36,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
@@ -64,7 +63,6 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 /**
@@ -145,13 +143,8 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemConte
     }
     
     @Override
-    protected void executeInventoryTasks(final 
Collection<CompletableFuture<?>> futures, final Collection<CDCJobItemContext> 
jobItemContexts) {
-        ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory", 
jobItemContexts.iterator().next()));
-    }
-    
-    @Override
-    protected void executeIncrementalTasks(final 
Collection<CompletableFuture<?>> futures, final Collection<CDCJobItemContext> 
jobItemContexts) {
-        ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental", 
jobItemContexts.iterator().next()));
+    protected ExecuteCallback buildExecuteCallback(final String identifier, 
final CDCJobItemContext jobItemContext) {
+        return new CDCExecuteCallback(identifier, jobItemContext);
     }
     
     @RequiredArgsConstructor
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
index e57d0066a76..55976be8446 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
@@ -36,7 +36,7 @@ public final class 
ConsistencyCheckJobConfigurationChangedProcessor extends Abst
     }
     
     @Override
-    protected PipelineJob buildPipelineJob(final String jobId) {
+    protected PipelineJob buildJob() {
         return new ConsistencyCheckJob();
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
index f9e57077ac5..18c3adc8666 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
@@ -39,7 +39,7 @@ public final class MigrationJobConfigurationChangedProcessor 
extends AbstractJob
     }
     
     @Override
-    protected PipelineJob buildPipelineJob(final String jobId) {
+    protected PipelineJob buildJob() {
         return new MigrationJob();
     }
     

Reply via email to