This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 09cb7af7fec Refactor pipeline job prepare stage and related code
(#24425)
09cb7af7fec is described below
commit 09cb7af7fec5d867e63c9ce65ccef5681d878419
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Mar 2 19:58:52 2023 +0800
Refactor pipeline job prepare stage and related code (#24425)
* Refactor pipeline job prepare stage and related code
* Rename
* Remove unused class
---
.../data/pipeline/api/job/JobStatus.java | 5 -----
.../progress/JobOffsetInfo.java} | 14 +++++-------
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 2 --
.../data/pipeline/cdc/core/job/CDCJob.java | 2 +-
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 16 +------------
.../pipeline/core/api/GovernanceRepositoryAPI.java | 16 +++++++++++++
.../core/api/InventoryIncrementalJobAPI.java | 17 ++++++++++++++
.../AbstractInventoryIncrementalJobAPIImpl.java | 21 +++++++++++++++++
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 11 +++++++++
.../core/job/progress/yaml/YamlJobOffsetInfo.java} | 17 ++++++--------
.../progress/yaml/YamlJobOffsetInfoSwapper.java} | 26 +++++++++++++---------
.../migration/prepare/MigrationJobPreparer.java | 19 ++++++----------
12 files changed, 101 insertions(+), 65 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
index 50f7272f2fc..d31c2505715 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
@@ -37,11 +37,6 @@ public enum JobStatus {
*/
PREPARING(true),
- /**
- * Job is in prepare success status.
- */
- PREPARE_SUCCESS(true),
-
/**
* Job is in execute inventory task status.
*/
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobOffsetInfo.java
similarity index 74%
copy from
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
copy to
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobOffsetInfo.java
index 77e836483c7..21201168ba6 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobOffsetInfo.java
@@ -15,21 +15,17 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.data.pipeline.api.job.progress;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
- * CDC table based pipeline job info.
+ * Job offset info.
*/
-@Getter
@RequiredArgsConstructor
-public class CDCTableBasedPipelineJobInfo implements PipelineJobInfo {
-
- private final PipelineJobMetaData jobMetaData;
-
- private final String databaseName;
+@Getter
+public final class JobOffsetInfo {
- private final String schemaTableNames;
+ private final boolean targetSchemaTableCreated;
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 89ba6a17672..6f644730758 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -36,7 +36,6 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
@@ -167,7 +166,6 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress();
incrementalTaskProgress.setPosition(PipelineJobPreparerUtils.getIncrementalPosition(null,
dumperConfig, dataSourceManager));
jobItemProgress.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
- jobItemProgress.setStatus(JobStatus.PREPARE_SUCCESS);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId,
i,
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
}
} catch (final SQLException ex) {
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 6479e96d100..4e10f0ca63d 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -56,7 +56,7 @@ public final class CDCJob extends AbstractSimplePipelineJob {
@Override
protected void doPrepare(final PipelineJobItemContext jobItemContext) {
- jobPreparer.prepare((CDCJobItemContext) jobItemContext);
+ jobPreparer.initTasks((CDCJobItemContext) jobItemContext);
}
@Override
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 8f789c6c2bf..b87f057d9ed 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -53,7 +52,7 @@ public final class CDCJobPreparer {
*
* @param jobItemContext job item context
*/
- public void prepare(final CDCJobItemContext jobItemContext) {
+ public void initTasks(final CDCJobItemContext jobItemContext) {
Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
if (!jobItemProgress.isPresent()) {
jobAPI.persistJobItemProgress(jobItemContext);
@@ -62,24 +61,11 @@ public final class CDCJobPreparer {
PipelineJobCenter.stop(jobItemContext.getJobId());
return;
}
- boolean needUpdateJobStatus = !jobItemProgress.isPresent() ||
JobStatus.PREPARING.equals(jobItemContext.getStatus()) ||
JobStatus.RUNNING.equals(jobItemContext.getStatus())
- ||
JobStatus.PREPARING_FAILURE.equals(jobItemContext.getStatus());
- if (needUpdateJobStatus) {
- updateJobItemStatus(JobStatus.PREPARING, jobItemContext);
- }
initIncrementalTasks(jobItemContext);
CDCJobConfiguration jobConfig = jobItemContext.getJobConfig();
if (jobConfig.isFull()) {
initInventoryTasks(jobItemContext);
}
- if (needUpdateJobStatus) {
- updateJobItemStatus(JobStatus.PREPARE_SUCCESS, jobItemContext);
- }
- }
-
- private void updateJobItemStatus(final JobStatus jobStatus, final
CDCJobItemContext jobItemContext) {
- jobItemContext.setStatus(jobStatus);
- jobAPI.persistJobItemProgress(jobItemContext);
}
private void initInventoryTasks(final CDCJobItemContext jobItemContext) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index b65b5632713..f58bf278fb6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -39,6 +39,22 @@ public interface GovernanceRepositoryAPI {
*/
boolean isExisted(String key);
+ /**
+ * Persist job offset info.
+ *
+ * @param jobId job id
+ * @param jobOffsetInfo job offset info
+ */
+ void persistJobOffsetInfo(String jobId, String jobOffsetInfo);
+
+ /**
+ * Get job offset info.
+ *
+ * @param jobId job id
+ * @return job offset info
+ */
+ Optional<String> getJobOffsetInfo(String jobId);
+
/**
* Persist job item progress.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index 8e47caca94c..7ed1701a46f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -21,6 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import
org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
@@ -52,6 +53,22 @@ public interface InventoryIncrementalJobAPI extends
PipelineJobAPI {
*/
PipelineProcessConfiguration showProcessConfiguration();
+ /**
+ * Persist job offset info.
+ *
+ * @param jobId job id
+ * @param jobOffsetInfo job offset info.
+ */
+ void persistJobOffsetInfo(String jobId, JobOffsetInfo jobOffsetInfo);
+
+ /**
+ * Get job offset info.
+ *
+ * @param jobId job id
+ * @return job offset progress
+ */
+ JobOffsetInfo getJobOffsetInfo(String jobId);
+
/**
* Get job progress.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index fa694640d81..8188d0cc8e3 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -29,6 +29,7 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import
org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
@@ -41,6 +42,8 @@ import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobOffsetInfo;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobOffsetInfoSwapper;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
@@ -75,6 +78,8 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl
extends AbstractPip
@Getter(AccessLevel.PROTECTED)
private final YamlInventoryIncrementalJobItemProgressSwapper
jobItemProgressSwapper = new YamlInventoryIncrementalJobItemProgressSwapper();
+ private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new
YamlJobOffsetInfoSwapper();
+
protected abstract String getTargetDatabaseType(PipelineJobConfiguration
pipelineJobConfig);
@Override
@@ -158,6 +163,22 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
}
+ @Override
+ public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo
jobOffsetInfo) {
+ String value =
YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo));
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobOffsetInfo(jobId,
value);
+ }
+
+ @Override
+ public JobOffsetInfo getJobOffsetInfo(final String jobId) {
+ Optional<String> offsetInfo =
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobOffsetInfo(jobId);
+ if (offsetInfo.isPresent()) {
+ YamlJobOffsetInfo info = YamlEngine.unmarshal(offsetInfo.get(),
YamlJobOffsetInfo.class);
+ return jobOffsetInfoSwapper.swapToObject(info);
+ }
+ return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo());
+ }
+
@Override
public Optional<InventoryIncrementalJobItemProgress>
getJobItemProgress(final String jobId, final int shardingItem) {
Optional<String> progress =
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId,
shardingItem);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index d28a50a2519..95e695ebfe1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -55,6 +55,17 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
return null != repository.getDirectly(key);
}
+ @Override
+ public void persistJobOffsetInfo(final String jobId, final String
jobOffsetInfo) {
+ repository.persist(PipelineMetaDataNode.getJobOffsetPath(jobId),
jobOffsetInfo);
+ }
+
+ @Override
+ public Optional<String> getJobOffsetInfo(final String jobId) {
+ String text =
repository.getDirectly(PipelineMetaDataNode.getJobOffsetPath(jobId));
+ return Strings.isNullOrEmpty(text) ? Optional.empty() :
Optional.of(text);
+ }
+
@Override
public void persistJobItemProgress(final String jobId, final int
shardingItem, final String progressValue) {
repository.persist(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem), progressValue);
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfo.java
similarity index 69%
copy from
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfo.java
index 77e836483c7..b238f1b99e3 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfo.java
@@ -15,21 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
- * CDC table based pipeline job info.
+ * Yaml job offset info.
*/
@Getter
-@RequiredArgsConstructor
-public class CDCTableBasedPipelineJobInfo implements PipelineJobInfo {
+@Setter
+public final class YamlJobOffsetInfo implements YamlConfiguration {
- private final PipelineJobMetaData jobMetaData;
-
- private final String databaseName;
-
- private final String schemaTableNames;
+ private boolean targetSchemaTableCreated;
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfoSwapper.java
similarity index 50%
rename from
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfoSwapper.java
index 77e836483c7..a8efecd574a 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CDCTableBasedPipelineJobInfo.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobOffsetInfoSwapper.java
@@ -15,21 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.pojo;
+package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
+import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
/**
- * CDC table based pipeline job info.
+ * Yaml job offset info swapper.
*/
-@Getter
-@RequiredArgsConstructor
-public class CDCTableBasedPipelineJobInfo implements PipelineJobInfo {
+public final class YamlJobOffsetInfoSwapper implements
YamlConfigurationSwapper<YamlJobOffsetInfo, JobOffsetInfo> {
- private final PipelineJobMetaData jobMetaData;
+ @Override
+ public YamlJobOffsetInfo swapToYamlConfiguration(final JobOffsetInfo data)
{
+ YamlJobOffsetInfo result = new YamlJobOffsetInfo();
+ result.setTargetSchemaTableCreated(data.isTargetSchemaTableCreated());
+ return result;
+ }
- private final String databaseName;
-
- private final String schemaTableNames;
+ @Override
+ public JobOffsetInfo swapToObject(final YamlJobOffsetInfo yamlConfig) {
+ return new JobOffsetInfo(yamlConfig.isTargetSchemaTableCreated());
+ }
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index bfde585bee7..40c3254150d 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
@@ -51,7 +52,6 @@ import
org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map.Entry;
-import java.util.Optional;
/**
* Migration job preparer.
@@ -99,27 +99,22 @@ public final class MigrationJobPreparer {
}
LockDefinition lockDefinition = new GlobalLockDefinition(lockName);
long startTimeMillis = System.currentTimeMillis();
- if (lockContext.tryLock(lockDefinition, 180000)) {
+ if (lockContext.tryLock(lockDefinition, 600000)) {
log.info("try lock success, jobId={}, shardingItem={}, cost {}
ms", jobConfig.getJobId(), jobItemContext.getShardingItem(),
System.currentTimeMillis() - startTimeMillis);
try {
- Optional<InventoryIncrementalJobItemProgress> jobItemProgress
= jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
- JobStatus currentStatus =
jobItemProgress.map(InventoryIncrementalJobItemProgress::getStatus).orElse(null);
- boolean prepareFlag = !jobItemProgress.isPresent() ||
JobStatus.PREPARING.equals(currentStatus) ||
JobStatus.RUNNING.equals(currentStatus)
- || JobStatus.PREPARING_FAILURE.equals(currentStatus);
- if (prepareFlag) {
+ JobOffsetInfo offsetInfo =
jobAPI.getJobOffsetInfo(jobConfig.getJobId());
+ if (!offsetInfo.isTargetSchemaTableCreated()) {
jobItemContext.setStatus(JobStatus.PREPARING);
jobAPI.updateJobItemStatus(jobConfig.getJobId(),
jobItemContext.getShardingItem(), JobStatus.PREPARING);
prepareAndCheckTarget(jobItemContext);
- // TODO Loop insert zookeeper performance is not good
- for (int i = 0; i <=
jobItemContext.getJobConfig().getJobShardingCount(); i++) {
- jobItemContext.setStatus(JobStatus.PREPARE_SUCCESS);
- jobAPI.updateJobItemStatus(jobConfig.getJobId(), i,
JobStatus.PREPARE_SUCCESS);
- }
+ jobAPI.persistJobOffsetInfo(jobConfig.getJobId(), new
JobOffsetInfo(true));
}
} finally {
log.info("unlock, jobId={}, shardingItem={}, cost {} ms",
jobConfig.getJobId(), jobItemContext.getShardingItem(),
System.currentTimeMillis() - startTimeMillis);
lockContext.unlock(lockDefinition);
}
+ } else {
+ log.warn("jobId={}, shardingItem={} try lock failed",
jobConfig.getJobId(), jobItemContext.getShardingItem());
}
}