This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a606bda3897 Improve pipeline job progress persistence (#19775)
a606bda3897 is described below
commit a606bda38974f157dc56302eabd43222776b79a8
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Aug 2 13:46:01 2022 +0800
Improve pipeline job progress persistence (#19775)
---
.../api/job/persist/PipelineJobPersistContext.java | 5 ++-
.../rulealtered/RuleAlteredJobPersistService.java | 38 +++++++++++++---------
2 files changed, 27 insertions(+), 16 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
index 2f0b185e097..ea590ea94f0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
@Getter
@RequiredArgsConstructor
@@ -30,5 +31,7 @@ public final class PipelineJobPersistContext {
private final int shardingItem;
- private final AtomicBoolean alreadyPersisted = new AtomicBoolean(false);
+ private final AtomicBoolean hasNewEvents = new AtomicBoolean(false);
+
+ private final AtomicReference<Long> beforePersistingProgressMillis = new
AtomicReference<>(null);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
index 3d5245bfef1..ee52e0d4864 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
@@ -29,8 +29,8 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Rule altered job persist service.
@@ -45,8 +45,10 @@ public final class RuleAlteredJobPersistService {
private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR =
Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-schedule-%d"));
+ private static final long DELAY_SECONDS = 1;
+
static {
- JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new
PersistJobContextRunnable(), 5, 1, TimeUnit.SECONDS);
+ JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new
PersistJobContextRunnable(), 5, DELAY_SECONDS, TimeUnit.SECONDS);
}
/**
@@ -83,34 +85,40 @@ public final class RuleAlteredJobPersistService {
log.debug("Persist interval parameter is null, jobId={},
shardingItem={}", jobId, shardingItem);
return;
}
- parameter.getAlreadyPersisted().compareAndSet(true, false);
+ parameter.getHasNewEvents().set(true);
}
- private static void persist(final String jobId, final int shardingItem,
final long persistTimeMillis, final PipelineJobPersistContext param) {
+ private static void persist(final String jobId, final int shardingItem,
final PipelineJobPersistContext persistContext) {
+ Long beforePersistingProgressMillis =
persistContext.getBeforePersistingProgressMillis().get();
+ if ((null == beforePersistingProgressMillis ||
System.currentTimeMillis() - beforePersistingProgressMillis <
TimeUnit.SECONDS.toMillis(DELAY_SECONDS))
+ && !persistContext.getHasNewEvents().get()) {
+ return;
+ }
Map<Integer, RuleAlteredJobScheduler> schedulerMap =
RuleAlteredJobSchedulerCenter.getJobSchedulerMap(jobId);
RuleAlteredJobScheduler scheduler = schedulerMap.get(shardingItem);
if (null == scheduler) {
- log.warn("job schedule not exists, job id: {}, sharding item: {}",
jobId, shardingItem);
+ log.warn("persist, job schedule not exists, jobId={},
shardingItem={}", jobId, shardingItem);
return;
}
- log.info("execute persist, job id={}, sharding item={},
persistTimeMillis={}", jobId, shardingItem, persistTimeMillis);
+ if (null == beforePersistingProgressMillis) {
+
persistContext.getBeforePersistingProgressMillis().set(System.currentTimeMillis());
+ }
+ persistContext.getHasNewEvents().set(false);
+ long startTimeMillis = System.currentTimeMillis();
REPOSITORY_API.persistJobProgress(scheduler.getJobContext());
- param.getAlreadyPersisted().set(true);
+ persistContext.getBeforePersistingProgressMillis().set(null);
+ if (6 == ThreadLocalRandom.current().nextInt(100)) {
+ log.info("persist, jobId={}, shardingItem={}, cost time: {} ms",
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
+ }
}
private static final class PersistJobContextRunnable implements Runnable {
@Override
public void run() {
- long currentTimeMillis = System.currentTimeMillis();
for (Entry<String, Map<Integer, PipelineJobPersistContext>> entry
: JOB_PERSIST_MAP.entrySet()) {
- entry.getValue().forEach((shardingItem, param) -> {
- AtomicBoolean alreadyPersisted =
param.getAlreadyPersisted();
- if (alreadyPersisted.get()) {
- return;
- }
- persist(entry.getKey(), shardingItem, currentTimeMillis,
param);
- alreadyPersisted.set(true);
+ entry.getValue().forEach((shardingItem, persistContext) -> {
+ persist(entry.getKey(), shardingItem, persistContext);
});
}
}