This is an automated email from the ASF dual-hosted git repository.
chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new aca91c8d0fb Refactor AbstractJobConfigurationChangedProcessor (#29375)
aca91c8d0fb is described below
commit aca91c8d0fbe8dfc3e1baffbed6134437c55574c
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Dec 12 16:01:46 2023 +0800
Refactor AbstractJobConfigurationChangedProcessor (#29375)
---
.../impl/AbstractJobConfigurationChangedProcessor.java | 18 +++++++-----------
1 file changed, 7 insertions(+), 11 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
index a59a9af680d..1a8ced96cd2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
@@ -30,8 +30,6 @@ import
org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
-import java.util.Collection;
-
/**
* Abstract job configuration changed processor.
*/
@@ -39,18 +37,17 @@ import java.util.Collection;
public abstract class AbstractJobConfigurationChangedProcessor implements
JobConfigurationChangedProcessor {
@Override
- public void process(final Type eventType, final JobConfiguration
jobConfig) {
- boolean disabled = jobConfig.isDisabled();
+ public final void process(final Type eventType, final JobConfiguration
jobConfig) {
boolean deleted = Type.DELETED == eventType;
if (deleted) {
onDeleted(jobConfig);
}
+ boolean disabled = jobConfig.isDisabled();
String jobId = jobConfig.getJobName();
if (disabled || deleted) {
- Collection<Integer> jobItems =
PipelineJobRegistry.getShardingItems(jobId);
PipelineJobRegistry.stop(jobId);
if (disabled) {
- onDisabled(jobConfig, jobItems);
+ onDisabled(jobId);
}
return;
}
@@ -68,16 +65,15 @@ public abstract class
AbstractJobConfigurationChangedProcessor implements JobCon
}
}
- protected void onDisabled(final JobConfiguration jobConfig, final
Collection<Integer> jobItems) {
- String jobId = jobConfig.getJobName();
+ protected abstract void onDeleted(JobConfiguration jobConfig);
+
+ private void onDisabled(final String jobId) {
PipelineDistributedBarrier distributedBarrier =
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
- for (Integer each : jobItems) {
+ for (Integer each : PipelineJobRegistry.getShardingItems(jobId)) {
distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
each);
}
}
- protected abstract void onDeleted(JobConfiguration jobConfig);
-
protected void executeJob(final JobConfiguration jobConfig) {
PipelineJob job = buildJob();
String jobId = jobConfig.getJobName();