This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 97a54388f80 Refactor GovernanceRepositoryAPI for common usage (#19979)
97a54388f80 is described below
commit 97a54388f80c2aa408496b4f520e9df0beb02cb4
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Aug 9 13:28:46 2022 +0800
Refactor GovernanceRepositoryAPI for common usage (#19979)
* Refactor GovernanceRepositoryAPI for common usage
* Fix codestyle and refactor getJobProgress
* Fix ci error
---
.../data/pipeline/api/RuleAlteredJobAPI.java | 27 +++++++++
.../api/fixture/RuleAlteredJobAPIFixture.java | 15 +++++
.../pipeline/core/api/GovernanceRepositoryAPI.java | 20 ++-----
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 56 ++----------------
.../core/api/impl/RuleAlteredJobAPIImpl.java | 69 +++++++++++++++++++++-
.../persist/PipelineJobProgressPersistService.java | 7 +--
.../scenario/rulealtered/RuleAlteredJob.java | 41 ++++++-------
.../rulealtered/RuleAlteredJobPreparer.java | 4 +-
.../rulealtered/RuleAlteredJobScheduler.java | 4 +-
.../api/impl/GovernanceRepositoryAPIImplTest.java | 25 ++------
.../core/api/impl/RuleAlteredJobAPIImplTest.java | 18 ++++--
.../rulealtered/RuleAlteredJobWorkerTest.java | 3 +-
.../prepare/InventoryTaskSplitterTest.java | 4 +-
13 files changed, 165 insertions(+), 128 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
index 4d27306af74..096334bd9f3 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.data.pipeline.api;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
@@ -181,4 +183,29 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
* @return job configuration
*/
RuleAlteredJobConfiguration getJobConfig(String jobId);
+
+ /**
+ * Persist job progress.
+ *
+ * @param jobContext job context
+ */
+ void persistJobProgress(PipelineJobContext jobContext);
+
+ /**
+ * Get job progress.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @return job progress
+ */
+ JobProgress getJobProgress(String jobId, int shardingItem);
+
+ /**
+ * Update sharding job status.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @param status status
+ */
+ void updateShardingJobStatus(String jobId, int shardingItem, JobStatus
status);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
index bfb05d6ae54..2e9817ab54a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.data.pipeline.api.fixture;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
@@ -136,4 +138,17 @@ public final class RuleAlteredJobAPIFixture implements
RuleAlteredJobAPI {
public boolean isDefault() {
return RuleAlteredJobAPI.super.isDefault();
}
+
+ @Override
+ public void persistJobProgress(final PipelineJobContext jobContext) {
+ }
+
+ @Override
+ public JobProgress getJobProgress(final String jobId, final int
shardingItem) {
+ return null;
+ }
+
+ @Override
+ public void updateShardingJobStatus(final String jobId, final int
shardingItem, final JobStatus status) {
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index fd60c290b46..db47c3d2dc7 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -17,9 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.api;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
-import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import java.util.List;
@@ -41,9 +38,11 @@ public interface GovernanceRepositoryAPI {
/**
* Persist job progress.
*
- * @param jobContext job context
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @param progressValue progress value
*/
- void persistJobProgress(PipelineJobContext jobContext);
+ void persistJobProgress(String jobId, int shardingItem, String
progressValue);
/**
* Get job progress.
@@ -52,7 +51,7 @@ public interface GovernanceRepositoryAPI {
* @param shardingItem sharding item
* @return job progress
*/
- JobProgress getJobProgress(String jobId, int shardingItem);
+ String getJobProgress(String jobId, int shardingItem);
/**
* Persist job check result.
@@ -108,13 +107,4 @@ public interface GovernanceRepositoryAPI {
* @return sharding items
*/
List<Integer> getShardingItems(String jobId);
-
- /**
- * Update sharding job status.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @param status status
- */
- void updateShardingJobStatus(String jobId, int shardingItem, JobStatus
status);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 97f8f661d2d..f21243ce4ec 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -20,19 +20,8 @@ package
org.apache.shardingsphere.data.pipeline.core.api.impl;
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
-import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgress;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -47,8 +36,6 @@ import java.util.stream.Collectors;
@Slf4j
public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAPI {
- private static final YamlJobProgressSwapper SWAPPER = new
YamlJobProgressSwapper();
-
private final ClusterPersistRepository repository;
@Override
@@ -57,37 +44,13 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
}
@Override
- public void persistJobProgress(final PipelineJobContext context) {
- RuleAlteredJobContext jobContext = (RuleAlteredJobContext) context;
- JobProgress jobProgress = new JobProgress();
- jobProgress.setStatus(jobContext.getStatus());
-
jobProgress.setSourceDatabaseType(jobContext.getJobConfig().getSourceDatabaseType());
- jobProgress.setIncremental(getIncrementalTasksProgress(jobContext));
- jobProgress.setInventory(getInventoryTasksProgress(jobContext));
- String value =
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobProgress));
-
repository.persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobContext.getJobId(),
jobContext.getShardingItem()), value);
- }
-
- private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final
RuleAlteredJobContext jobContext) {
- return new JobItemIncrementalTasksProgress(
- jobContext.getIncrementalTasks()
-
.stream().collect(Collectors.toMap(IncrementalTask::getTaskId,
IncrementalTask::getProgress)));
- }
-
- private JobItemInventoryTasksProgress getInventoryTasksProgress(final
RuleAlteredJobContext jobContext) {
- return new JobItemInventoryTasksProgress(
- jobContext.getInventoryTasks()
- .stream()
- .collect(Collectors.toMap(InventoryTask::getTaskId,
InventoryTask::getProgress)));
+ public void persistJobProgress(final String jobId, final int shardingItem,
final String progressValue) {
+ repository.persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobId,
shardingItem), progressValue);
}
@Override
- public JobProgress getJobProgress(final String jobId, final int
shardingItem) {
- String data =
repository.get(PipelineMetaDataNode.getScalingJobOffsetPath(jobId,
shardingItem));
- if (Strings.isNullOrEmpty(data)) {
- return null;
- }
- return SWAPPER.swapToObject(YamlEngine.unmarshal(data,
YamlJobProgress.class));
+ public String getJobProgress(final String jobId, final int shardingItem) {
+ return
repository.get(PipelineMetaDataNode.getScalingJobOffsetPath(jobId,
shardingItem));
}
@Override
@@ -129,15 +92,4 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
log.info("getShardingItems, jobId={}, offsetKeys={}", jobId, result);
return
result.stream().map(Integer::parseInt).collect(Collectors.toList());
}
-
- @Override
- public void updateShardingJobStatus(final String jobId, final int
shardingItem, final JobStatus status) {
- JobProgress jobProgress = getJobProgress(jobId, shardingItem);
- if (null == jobProgress) {
- log.warn("updateShardingJobStatus, jobProgress is null, jobId={},
shardingItem={}", jobId, shardingItem);
- return;
- }
- jobProgress.setStatus(status);
- persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobId,
shardingItem),
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobProgress)));
- }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 933e2bf6003..05afdeb0a4b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -19,14 +19,20 @@ package
org.apache.shardingsphere.data.pipeline.core.api.impl;
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
+import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+import
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
@@ -36,9 +42,14 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreatio
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
+import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -53,6 +64,7 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.confi
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -69,6 +81,8 @@ import java.util.stream.Stream;
@Slf4j
public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl
implements RuleAlteredJobAPI {
+ private static final YamlJobProgressSwapper SWAPPER = new
YamlJobProgressSwapper();
+
@Override
public List<JobInfo> list() {
checkModeConfig();
@@ -137,7 +151,7 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
String jobId = jobConfig.getJobId();
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
return IntStream.range(0,
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map,
each) -> {
- JobProgress jobProgress =
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobId, each);
+ JobProgress jobProgress = getJobProgress(jobId, each);
if (null != jobProgress) {
jobProgress.setActive(!jobConfigPOJO.isDisabled());
}
@@ -323,7 +337,7 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
// TODO rewrite job status update after job progress structure refactor
for (int each : repositoryAPI.getShardingItems(jobId)) {
PipelineJobCenter.getJobContext(jobId, each).ifPresent(jobContext
-> jobContext.setStatus(JobStatus.FINISHED));
- repositoryAPI.updateShardingJobStatus(jobId, each,
JobStatus.FINISHED);
+ updateShardingJobStatus(jobId, each, JobStatus.FINISHED);
}
PipelineJobCenter.stop(jobId);
stop(jobId);
@@ -345,4 +359,55 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
private RuleAlteredJobConfiguration getJobConfig(final
JobConfigurationPOJO jobConfigPOJO) {
return
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
}
+
+ @Override
+ public void persistJobProgress(final PipelineJobContext jobContext) {
+ if (!(jobContext instanceof RuleAlteredJobContext)) {
+ return;
+ }
+ RuleAlteredJobContext context = (RuleAlteredJobContext) jobContext;
+ JobProgress jobProgress = new JobProgress();
+ jobProgress.setStatus(jobContext.getStatus());
+
jobProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType());
+ jobProgress.setIncremental(getIncrementalTasksProgress(context));
+ jobProgress.setInventory(getInventoryTasksProgress(context));
+ String value =
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobProgress));
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobProgress(jobContext.getJobId(),
jobContext.getShardingItem(), value);
+ }
+
+ private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final
RuleAlteredJobContext jobContext) {
+ Map<String, IncrementalTaskProgress> incrementalTaskProgressMap = new
HashMap<>();
+ for (IncrementalTask each : jobContext.getIncrementalTasks()) {
+ incrementalTaskProgressMap.put(each.getTaskId(),
each.getProgress());
+ }
+ return new JobItemIncrementalTasksProgress(incrementalTaskProgressMap);
+ }
+
+ private JobItemInventoryTasksProgress getInventoryTasksProgress(final
RuleAlteredJobContext jobContext) {
+ Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new
HashMap<>();
+ for (InventoryTask each : jobContext.getInventoryTasks()) {
+ inventoryTaskProgressMap.put(each.getTaskId(), each.getProgress());
+ }
+ return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
+ }
+
+ @Override
+ public JobProgress getJobProgress(final String jobId, final int
shardingItem) {
+ String data =
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobId,
shardingItem);
+ if (StringUtils.isBlank(data)) {
+ return null;
+ }
+ return SWAPPER.swapToObject(YamlEngine.unmarshal(data,
YamlJobProgress.class));
+ }
+
+ @Override
+ public void updateShardingJobStatus(final String jobId, final int
shardingItem, final JobStatus status) {
+ JobProgress jobProgress = getJobProgress(jobId, shardingItem);
+ if (null == jobProgress) {
+ log.warn("updateShardingJobStatus, jobProgress is null, jobId={},
shardingItem={}", jobId, shardingItem);
+ return;
+ }
+ jobProgress.setStatus(status);
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobProgress(jobId,
shardingItem, YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobProgress)));
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index cb7c66aa166..71d5db43e29 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -18,9 +18,8 @@
package org.apache.shardingsphere.data.pipeline.core.job.progress.persist;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
-import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -42,8 +41,6 @@ public final class PipelineJobProgressPersistService {
private static final Map<String, Map<Integer,
PipelineJobProgressPersistContext>> JOB_PROGRESS_PERSIST_MAP = new
ConcurrentHashMap<>();
- private static final GovernanceRepositoryAPI REPOSITORY_API =
PipelineAPIFactory.getGovernanceRepositoryAPI();
-
private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR =
Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("pipeline-progress-persist-%d"));
private static final long DELAY_SECONDS = 1;
@@ -105,7 +102,7 @@ public final class PipelineJobProgressPersistService {
}
persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
- REPOSITORY_API.persistJobProgress(jobContext.get());
+
RuleAlteredJobAPIFactory.getInstance().persistJobProgress(jobContext.get());
persistContext.getBeforePersistingProgressMillis().set(null);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
log.info("persist, jobId={}, shardingItem={}, cost time: {} ms",
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index ebd74c85f1a..236a3f17881 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -19,14 +19,13 @@ package
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
@@ -41,8 +40,6 @@ import
org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@RequiredArgsConstructor
public final class RuleAlteredJob extends AbstractPipelineJob implements
SimpleJob, PipelineJob {
- private final GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
-
private final PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
// Shared by all sharding items
@@ -57,7 +54,7 @@ public final class RuleAlteredJob extends AbstractPipelineJob
implements SimpleJ
}
setJobId(shardingContext.getJobName());
RuleAlteredJobConfiguration jobConfig =
RuleAlteredJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
- JobProgress initProgress =
governanceRepositoryAPI.getJobProgress(shardingContext.getJobName(),
shardingContext.getShardingItem());
+ JobProgress initProgress =
RuleAlteredJobAPIFactory.getInstance().getJobProgress(shardingContext.getJobName(),
shardingContext.getShardingItem());
RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem(),
initProgress, dataSourceManager);
int shardingItem = jobContext.getShardingItem();
if (getTasksRunnerMap().containsKey(shardingItem)) {
@@ -75,6 +72,23 @@ public final class RuleAlteredJob extends
AbstractPipelineJob implements SimpleJ
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(),
shardingItem);
}
+ private void prepare(final RuleAlteredJobContext jobContext) {
+ try {
+ jobPreparer.prepare(jobContext);
+ } catch (final PipelineIgnoredException ex) {
+ log.info("pipeline ignore exception: {}", ex.getMessage());
+ PipelineJobCenter.stop(getJobId());
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ log.error("job prepare failed, {}-{}", getJobId(),
jobContext.getShardingItem(), ex);
+ PipelineJobCenter.stop(getJobId());
+ jobContext.setStatus(JobStatus.PREPARING_FAILURE);
+
RuleAlteredJobAPIFactory.getInstance().persistJobProgress(jobContext);
+ throw ex;
+ }
+ }
+
/**
* Stop job.
*/
@@ -95,21 +109,4 @@ public final class RuleAlteredJob extends
AbstractPipelineJob implements SimpleJ
getTasksRunnerMap().clear();
PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
}
-
- private void prepare(final RuleAlteredJobContext jobContext) {
- try {
- jobPreparer.prepare(jobContext);
- } catch (final PipelineIgnoredException ex) {
- log.info("pipeline ignore exception: {}", ex.getMessage());
- PipelineJobCenter.stop(getJobId());
- // CHECKSTYLE:OFF
- } catch (final RuntimeException ex) {
- // CHECKSTYLE:ON
- log.error("job prepare failed, {}-{}", getJobId(),
jobContext.getShardingItem(), ex);
- PipelineJobCenter.stop(getJobId());
- jobContext.setStatus(JobStatus.PREPARING_FAILURE);
-
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobProgress(jobContext);
- throw ex;
- }
- }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 6253682d5a5..c3231d5ea35 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -71,8 +71,6 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public final class RuleAlteredJobPreparer {
- private final InventoryTaskSplitter inventoryTaskSplitter = new
InventoryTaskSplitter();
-
/**
* Do prepare work for scaling job.
*
@@ -184,7 +182,7 @@ public final class RuleAlteredJobPreparer {
}
private void initInventoryTasks(final RuleAlteredJobContext jobContext) {
- List<InventoryTask> allInventoryTasks =
inventoryTaskSplitter.splitInventoryData(jobContext);
+ List<InventoryTask> allInventoryTasks = new
InventoryTaskSplitter().splitInventoryData(jobContext);
jobContext.getInventoryTasks().addAll(allInventoryTasks);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index 45f99dc3e21..7ae9c65e99b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -20,10 +20,10 @@ package
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -64,7 +64,7 @@ public final class RuleAlteredJobScheduler implements
PipelineTasksRunner {
log.info("job stopping, ignore inventory task");
return;
}
-
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobProgress(jobContext);
+ RuleAlteredJobAPIFactory.getInstance().persistJobProgress(jobContext);
if (executeInventoryTask()) {
if (jobContext.isStopping()) {
log.info("stopping, ignore incremental task");
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index c6864fe9f61..f33ad304c97 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -22,22 +22,18 @@ import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import org.apache.shardingsphere.data.pipeline.core.util.ConfigurationFileUtil;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import org.junit.BeforeClass;
@@ -61,8 +57,6 @@ import static org.mockito.Mockito.mock;
public final class GovernanceRepositoryAPIImplTest {
- private static final YamlJobProgressSwapper SWAPPER = new
YamlJobProgressSwapper();
-
private static GovernanceRepositoryAPI governanceRepositoryAPI;
@BeforeClass
@@ -74,9 +68,9 @@ public final class GovernanceRepositoryAPIImplTest {
@Test
public void assertPersistJobProgress() {
RuleAlteredJobContext jobContext = mockJobContext();
- governanceRepositoryAPI.persistJobProgress(jobContext);
- JobProgress actual =
governanceRepositoryAPI.getJobProgress(jobContext.getJobId(),
jobContext.getShardingItem());
-
assertThat(YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(actual)),
is(ConfigurationFileUtil.readFileAndIgnoreComments("governance-repository.yaml")));
+ governanceRepositoryAPI.persistJobProgress(jobContext.getJobId(),
jobContext.getShardingItem(), "testValue");
+ String actual =
governanceRepositoryAPI.getJobProgress(jobContext.getJobId(),
jobContext.getShardingItem());
+ assertThat(actual, is("testValue"));
}
@Test
@@ -91,7 +85,7 @@ public final class GovernanceRepositoryAPIImplTest {
public void assertDeleteJob() {
governanceRepositoryAPI.persist(DataPipelineConstants.DATA_PIPELINE_ROOT +
"/1", "");
governanceRepositoryAPI.deleteJob("1");
- JobProgress actual = governanceRepositoryAPI.getJobProgress("1", 0);
+ String actual = governanceRepositoryAPI.getJobProgress("1", 0);
assertNull(actual);
}
@@ -125,21 +119,12 @@ public final class GovernanceRepositoryAPIImplTest {
@Test
public void assertGetShardingItems() {
RuleAlteredJobContext jobContext = mockJobContext();
- governanceRepositoryAPI.persistJobProgress(jobContext);
+ governanceRepositoryAPI.persistJobProgress(jobContext.getJobId(),
jobContext.getShardingItem(), "testValue");
List<Integer> shardingItems =
governanceRepositoryAPI.getShardingItems(jobContext.getJobId());
assertThat(shardingItems.size(), is(1));
assertThat(shardingItems.get(0), is(jobContext.getShardingItem()));
}
- @Test
- public void assertRenewJobStatus() {
- RuleAlteredJobContext jobContext = mockJobContext();
- governanceRepositoryAPI.persistJobProgress(jobContext);
- governanceRepositoryAPI.updateShardingJobStatus(jobContext.getJobId(),
jobContext.getShardingItem(), JobStatus.FINISHED);
- JobProgress jobProgress =
governanceRepositoryAPI.getJobProgress(jobContext.getJobId(),
jobContext.getShardingItem());
- assertThat(jobProgress.getStatus(), is(JobStatus.FINISHED));
- }
-
private RuleAlteredJobContext mockJobContext() {
RuleAlteredJobContext result = new
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new
JobProgress(), new PipelineDataSourceManager());
TaskConfiguration taskConfig = result.getTaskConfig();
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index 6c72641a38d..58973dc9c7d 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -200,9 +200,9 @@ public final class RuleAlteredJobAPIImplTest {
assertTrue(jobId.isPresent());
final GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new
PipelineDataSourceManager());
- repositoryAPI.persistJobProgress(jobContext);
+ ruleAlteredJobAPI.persistJobProgress(jobContext);
repositoryAPI.persistJobCheckResult(jobId.get(), true);
- repositoryAPI.updateShardingJobStatus(jobId.get(), 0,
JobStatus.FINISHED);
+ ruleAlteredJobAPI.updateShardingJobStatus(jobId.get(), 0,
JobStatus.FINISHED);
ruleAlteredJobAPI.switchClusterConfiguration(jobId.get());
}
@@ -213,9 +213,9 @@ public final class RuleAlteredJobAPIImplTest {
assertTrue(jobId.isPresent());
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new
PipelineDataSourceManager());
- repositoryAPI.persistJobProgress(jobContext);
+ ruleAlteredJobAPI.persistJobProgress(jobContext);
repositoryAPI.persistJobCheckResult(jobId.get(), true);
- repositoryAPI.updateShardingJobStatus(jobId.get(),
jobContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
+ ruleAlteredJobAPI.updateShardingJobStatus(jobId.get(),
jobContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
ruleAlteredJobAPI.switchClusterConfiguration(jobId.get());
Map<Integer, JobProgress> progress =
ruleAlteredJobAPI.getProgress(jobId.get());
for (Entry<Integer, JobProgress> entry : progress.entrySet()) {
@@ -252,4 +252,14 @@ public final class RuleAlteredJobAPIImplTest {
statement.execute("INSERT INTO t_order (order_id, user_id) VALUES
(1, 'xxx'), (999, 'yyy')");
}
}
+
+ @Test
+ public void assertRenewJobStatus() {
+ final RuleAlteredJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
+ RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new
PipelineDataSourceManager());
+ ruleAlteredJobAPI.persistJobProgress(jobContext);
+ ruleAlteredJobAPI.updateShardingJobStatus(jobConfig.getJobId(), 0,
JobStatus.FINISHED);
+ JobProgress jobProgress =
ruleAlteredJobAPI.getJobProgress(jobContext.getJobId(),
jobContext.getShardingItem());
+ assertThat(jobProgress.getStatus(), is(JobStatus.FINISHED));
+ }
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index 32efa4f06fc..f2eeed50c7a 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import org.apache.commons.io.FileUtils;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
@@ -100,7 +101,7 @@ public final class RuleAlteredJobWorkerTest {
RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new
PipelineDataSourceManager());
jobContext.setStatus(JobStatus.PREPARING);
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
- repositoryAPI.persistJobProgress(jobContext);
+ RuleAlteredJobAPIFactory.getInstance().persistJobProgress(jobContext);
URL jobConfigUrl =
getClass().getClassLoader().getResource("scaling/rule_alter/scaling_job_config.yaml");
assertNotNull(jobConfigUrl);
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigPath(jobContext.getJobId()),
FileUtils.readFileToString(new File(jobConfigUrl.getFile()),
StandardCharsets.UTF_8));
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index 99fbc9b6d30..18f0c16ea59 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -17,12 +17,12 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -66,7 +66,7 @@ public final class InventoryTaskSplitterTest {
private void initJobContext() {
RuleAlteredJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
- JobProgress initProgress =
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobConfig.getJobId(),
0);
+ JobProgress initProgress =
RuleAlteredJobAPIFactory.getInstance().getJobProgress(jobConfig.getJobId(), 0);
jobContext = new RuleAlteredJobContext(jobConfig, 0, initProgress, new
PipelineDataSourceManager());
dataSourceManager = jobContext.getDataSourceManager();
taskConfig = jobContext.getTaskConfig();