This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d6f4d38023d Use PipelineJobType.getOption() (#37105)
d6f4d38023d is described below
commit d6f4d38023d83e8a6d877247ac304335c5a82007
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 15 10:10:31 2025 +0800
Use PipelineJobType.getOption() (#37105)
---
.../job/executor/DistributedPipelineJobExecutor.java | 6 +++---
.../data/pipeline/core/job/id/PipelineJobIdUtils.java | 2 +-
.../persist/PipelineJobProgressPersistService.java | 2 +-
.../job/service/PipelineJobConfigurationManager.java | 9 ++++++---
.../pipeline/core/job/service/PipelineJobManager.java | 17 +++++++++++------
.../core/job/service/TransmissionJobManager.java | 2 +-
.../data/pipeline/core/job/type/JobCodeRegistry.java | 4 ++--
.../PipelineContextManagerLifecycleListener.java | 2 +-
.../processor/JobConfigurationChangedProcessEngine.java | 2 +-
.../core/task/runner/TransmissionTasksRunner.java | 2 +-
.../apache/shardingsphere/data/pipeline/cdc/CDCJob.java | 4 ++--
.../shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java | 11 +++++++----
.../consistencycheck/api/ConsistencyCheckJobAPI.java | 2 +-
.../task/ConsistencyCheckTasksRunner.java | 4 ++--
.../migration/preparer/MigrationJobPreparer.java | 2 +-
.../ConsistencyCheckJobExecutorCallbackTest.java | 2 +-
.../api/ConsistencyCheckJobAPITest.java | 2 +-
.../scenario/migration/api/MigrationJobAPITest.java | 2 +-
18 files changed, 44 insertions(+), 33 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutor.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutor.java
index a18639f6f83..807598a9817 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutor.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/executor/DistributedPipelineJobExecutor.java
@@ -70,8 +70,8 @@ public final class DistributedPipelineJobExecutor {
}
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(jobId);
- PipelineJobConfiguration jobConfig =
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- PipelineJobItemManager<PipelineJobItemProgress> jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+ PipelineJobConfiguration jobConfig =
jobType.getOption().getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+ PipelineJobItemManager<PipelineJobItemProgress> jobItemManager = new
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
PipelineJobItemProgress jobItemProgress =
jobItemManager.getProgress(shardingContext.getJobName(),
shardingItem).orElse(null);
TransmissionProcessContext jobProcessContext =
createTransmissionProcessContext(jobId, jobType, contextKey);
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
@@ -112,7 +112,7 @@ public final class DistributedPipelineJobExecutor {
}
private TransmissionProcessContext createTransmissionProcessContext(final
String jobId, final PipelineJobType jobType, final PipelineContextKey
contextKey) {
- if (!jobType.isTransmissionJob()) {
+ if (!jobType.getOption().isTransmissionJob()) {
return null;
}
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(new
PipelineProcessConfigurationPersistService().load(contextKey,
jobType.getType()));
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/id/PipelineJobIdUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/id/PipelineJobIdUtils.java
index d87b388323b..46baa0beffa 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/id/PipelineJobIdUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/id/PipelineJobIdUtils.java
@@ -56,7 +56,7 @@ public final class PipelineJobIdUtils {
String databaseName = instanceType == InstanceType.PROXY ? "" :
contextKey.getDatabaseName();
String databaseNameHex =
Hex.encodeHexString(databaseName.getBytes(StandardCharsets.UTF_8), true);
String databaseNameLengthHex =
Hex.encodeHexString(Shorts.toByteArray((short) databaseNameHex.length()), true);
- return 'j' + jobType.getCode() + PipelineJobId.CURRENT_VERSION +
instanceType.getCode() + databaseNameLengthHex + databaseNameHex;
+ return 'j' + jobType.getOption().getCode() +
PipelineJobId.CURRENT_VERSION + instanceType.getCode() + databaseNameLengthHex
+ databaseNameHex;
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 4e0223b89ad..809ff360d34 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -135,7 +135,7 @@ public final class PipelineJobProgressPersistService {
}
long startTimeMillis = System.currentTimeMillis();
new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
-
PipelineJobIdUtils.parseJobType(jobId).getType()).getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
+
PipelineJobIdUtils.parseJobType(jobId).getType()).getOption().getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
persistContext.getUnhandledEventCount().addAndGet(-currentUnhandledEventCount);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
log.info("persist, jobId={}, shardingItem={}, cost {} ms",
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
index e7963d54b4f..f9af4cc3f73 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.job.service;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.core.execute.ShardingTotalCountUsageJobExecutorServiceHandler;
import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.config.yaml.swapper.YamlPipelineJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
@@ -47,7 +48,7 @@ public final class PipelineJobConfigurationManager {
*/
@SuppressWarnings("unchecked")
public <T extends PipelineJobConfiguration> T getJobConfiguration(final
String jobId) {
- return (T)
jobType.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
+ return (T)
jobType.getOption().getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
}
/**
@@ -56,11 +57,13 @@ public final class PipelineJobConfigurationManager {
* @param jobConfig pipeline job configuration
* @return converted job configuration POJO
*/
+ @SuppressWarnings({"unchecked", "rawtypes"})
public JobConfigurationPOJO convertToJobConfigurationPOJO(final
PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO result = new JobConfigurationPOJO();
result.setJobName(jobConfig.getJobId());
-
result.setShardingTotalCount(jobType.isForceNoShardingWhenConvertToJobConfigurationPOJO()
? 1 : jobConfig.getJobShardingCount());
-
result.setJobParameter(YamlEngine.marshal(jobType.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
+
result.setShardingTotalCount(jobType.getOption().isForceNoShardingWhenConvertToJobConfigurationPOJO()
? 1 : jobConfig.getJobShardingCount());
+ YamlPipelineJobConfigurationSwapper swapper =
jobType.getOption().getYamlJobConfigurationSwapper();
+
result.setJobParameter(YamlEngine.marshal(swapper.swapToYamlConfiguration(jobConfig)));
String createTimeFormat =
LocalDateTime.now().format(DateTimeFormatterFactory.getDatetimeFormatter());
result.getProps().setProperty("create_time", createTimeFormat);
result.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 9633d062526..f0f3266d9fb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -67,7 +67,7 @@ public final class PipelineJobManager {
log.warn("jobId already exists in registry center, ignore, job id
is `{}`", jobId);
return;
}
- governanceFacade.getJobFacade().getJob().create(jobId,
jobType.getJobClass());
+ governanceFacade.getJobFacade().getJob().create(jobId,
jobType.getOption().getJobClass());
governanceFacade.getJobFacade().getConfiguration().persist(jobId, new
PipelineJobConfigurationManager(jobType).convertToJobConfigurationPOJO(jobConfig));
}
@@ -77,16 +77,18 @@ public final class PipelineJobManager {
* @param jobId job id
*/
public void resume(final String jobId) {
- if (jobType.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished())
{
- Optional<? extends PipelineJobItemProgress> jobItemProgress = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()).getProgress(jobId,
0);
+ if
(jobType.getOption().isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished())
{
+ Optional<? extends PipelineJobItemProgress> jobItemProgress = new
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper()).getProgress(jobId,
0);
if (jobItemProgress.isPresent() && JobStatus.FINISHED ==
jobItemProgress.get().getStatus()) {
log.info("job status is FINISHED, ignore, jobId={}", jobId);
return;
}
}
startCurrentDisabledJob(jobId);
- jobType.getToBeStartDisabledNextJobType().ifPresent(optional ->
startNextDisabledJob(jobId, optional));
-
+ String toBeStartDisabledNextJobType =
jobType.getOption().getGetToBeStartDisabledNextJobType();
+ if (null != toBeStartDisabledNextJobType) {
+ startNextDisabledJob(jobId, toBeStartDisabledNextJobType);
+ }
}
private void startCurrentDisabledJob(final String jobId) {
@@ -121,7 +123,10 @@ public final class PipelineJobManager {
* @param jobId job id
*/
public void stop(final String jobId) {
- jobType.getToBeStoppedPreviousJobType().ifPresent(optional ->
stopPreviousJob(jobId, optional));
+ String toBeStoppedPreviousJobType =
jobType.getOption().getGetToBeStoppedPreviousJobType();
+ if (null != toBeStoppedPreviousJobType) {
+ stopPreviousJob(jobId, toBeStoppedPreviousJobType);
+ }
stopCurrentJob(jobId);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index df81d4312cd..4e0020ea566 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -94,7 +94,7 @@ public final class TransmissionJobManager {
* @return each sharding item progress
*/
public Map<Integer, TransmissionJobItemProgress> getJobProgress(final
PipelineJobConfiguration jobConfig) {
- PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
String jobId = jobConfig.getJobId();
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
return IntStream.range(0,
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map,
each) -> {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/JobCodeRegistry.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/JobCodeRegistry.java
index 768482d0f4e..10ed2cde6c7 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/JobCodeRegistry.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/JobCodeRegistry.java
@@ -35,8 +35,8 @@ public final class JobCodeRegistry {
static {
for (PipelineJobType each :
ShardingSphereServiceLoader.getServiceInstances(PipelineJobType.class)) {
- Preconditions.checkArgument(2 == each.getCode().length(), "Job
type code length is not 2.");
- JOB_CODE_AND_TYPE_MAP.put(each.getCode(), each);
+ Preconditions.checkArgument(2 ==
each.getOption().getCode().length(), "Job type code length is not 2.");
+ JOB_CODE_AND_TYPE_MAP.put(each.getOption().getCode(), each);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index beac261130e..609adff8fd4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -78,7 +78,7 @@ public final class PipelineContextManagerLifecycleListener
implements ContextMan
log.warn("Parse job type failed, job name: {}, error: {}",
each.getJobName(), ex.getMessage());
continue;
}
- if ("CONSISTENCY_CHECK".equals(jobType.getCode())) {
+ if ("CONSISTENCY_CHECK".equals(jobType.getType())) {
continue;
}
JobConfigurationPOJO jobConfig;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
index 666efd52db2..b956286c849 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
@@ -63,7 +63,7 @@ public final class JobConfigurationChangedProcessEngine {
if (PipelineJobRegistry.isExisting(jobId)) {
log.info("{} added to executing jobs failed since it
already exists", jobId);
} else {
- T pipelineJobConfig = (T)
PipelineJobIdUtils.parseJobType(jobConfig.getJobName()).getYamlJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter());
+ T pipelineJobConfig = (T)
PipelineJobIdUtils.parseJobType(jobConfig.getJobName()).getOption().getYamlJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter());
executeJob(jobConfig, pipelineJobConfig, processor);
}
break;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
index 8640db836db..87994fb0aa0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
@@ -63,7 +63,7 @@ public final class TransmissionTasksRunner implements
PipelineTasksRunner {
inventoryTasks = jobItemContext.getInventoryTasks();
incrementalTasks = jobItemContext.getIncrementalTasks();
jobType = TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
- jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+ jobItemManager = new
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
}
@Override
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 60df62aaf65..052d7086c43 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -103,8 +103,8 @@ public final class CDCJob implements PipelineJob {
log.info("Execute job {}", jobId);
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(jobId);
- CDCJobConfiguration jobConfig = (CDCJobConfiguration)
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+ CDCJobConfiguration jobConfig = (CDCJobConfiguration)
jobType.getOption().getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(
jobId,
PipelineProcessConfigurationUtils.fillInDefaultValue(new
PipelineProcessConfigurationPersistService().load(contextKey,
jobType.getType())));
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index fe49f869bf0..1eb93f50767 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -50,6 +50,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlPipelineJobItemProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
@@ -132,7 +133,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
if
(governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId()))
{
log.warn("CDC job already exists in registry center, ignore, job
id is `{}`", jobConfig.getJobId());
} else {
-
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(),
jobType.getJobClass());
+
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(),
jobType.getOption().getJobClass());
JobConfigurationPOJO jobConfigPOJO =
jobConfigManager.convertToJobConfigurationPOJO(jobConfig);
jobConfigPOJO.setDisabled(true);
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(),
jobConfigPOJO);
@@ -180,9 +181,10 @@ public final class CDCJobAPI implements TransmissionJobAPI
{
return new
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
}
+ @SuppressWarnings({"rawtypes", "unchecked"})
private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
- PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
try (PipelineDataSourceManager pipelineDataSourceManager = new
PipelineDataSourceManager()) {
for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
if (jobItemManager.getProgress(jobId, i).isPresent()) {
@@ -190,8 +192,9 @@ public final class CDCJobAPI implements TransmissionJobAPI {
}
IncrementalDumperContext dumperContext =
buildDumperContext(jobConfig, i, new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
TransmissionJobItemProgress jobItemProgress =
getTransmissionJobItemProgress(jobConfig, pipelineDataSourceManager,
dumperContext);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(
- jobId, i,
YamlEngine.marshal(jobType.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+ YamlPipelineJobItemProgressSwapper swapper =
jobType.getOption().getYamlJobItemProgressSwapper();
+ PipelineAPIFactory.getPipelineGovernanceFacade(
+
PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(jobId,
i, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress)));
}
} catch (final SQLException ex) {
throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
diff --git
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
index 39eec1586d1..0d99f34600f 100644
---
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
@@ -68,7 +68,7 @@ public final class ConsistencyCheckJobAPI {
private final PipelineJobItemManager<ConsistencyCheckJobItemProgress>
jobItemManager;
public ConsistencyCheckJobAPI(final ConsistencyCheckJobType jobType) {
- progressSwapper = jobType.getYamlJobItemProgressSwapper();
+ progressSwapper = (YamlConsistencyCheckJobItemProgressSwapper)
jobType.getOption().getYamlJobItemProgressSwapper();
jobManager = new PipelineJobManager(jobType);
jobConfigManager = new PipelineJobConfigurationManager(jobType);
jobItemManager = new PipelineJobItemManager<>(progressSwapper);
diff --git
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index df6806eb0ba..50b84d63eca 100644
---
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -59,7 +59,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
private final PipelineJobManager jobManager = new
PipelineJobManager(jobType);
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
@@ -90,7 +90,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
return;
}
new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
-
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
+
.getOption().getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
CompletableFuture<?> future =
jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor);
PipelineExecuteEngine.trigger(Collections.singletonList(future), new
CheckExecuteCallback());
}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index e84e90c7433..fe3bcf8e4a7 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -80,7 +80,7 @@ import java.util.Collections;
@Slf4j
public final class MigrationJobPreparer implements
PipelineJobPreparer<MigrationJobItemContext> {
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(new
MigrationJobType().getYamlJobItemProgressSwapper());
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(new
MigrationJobType().getOption().getYamlJobItemProgressSwapper());
@Override
public void prepare(final MigrationJobItemContext jobItemContext) throws
SQLException {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
index 1cf7ce12d18..6e2d2a35607 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
@@ -72,7 +72,7 @@ class ConsistencyCheckJobExecutorCallbackTest {
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectedYamlTableCheckRangePositions)));
ConsistencyCheckJobExecutorCallback callback = new
ConsistencyCheckJobExecutorCallback();
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId));
- PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(new
ConsistencyCheckJobType().getYamlJobItemProgressSwapper());
+ PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(new
ConsistencyCheckJobType().getOption().getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobItemManager.getProgress(jobConfig.getJobId(), 0);
ConsistencyCheckJobItemContext actual =
callback.buildJobItemContext(jobConfig, 0, jobItemProgress.orElse(null), null,
null);
YamlTableCheckRangePositionSwapper tableCheckPositionSwapper = new
YamlTableCheckRangePositionSwapper();
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
index bb8ba7c779c..0cab811477d 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
@@ -59,7 +59,7 @@ class ConsistencyCheckJobAPITest {
private final ConsistencyCheckJobAPI jobAPI = new
ConsistencyCheckJobAPI(jobType);
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new
YamlMigrationJobConfigurationSwapper();
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
index cba8da27a7c..aa807dbb132 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
@@ -116,7 +116,7 @@ class MigrationJobAPITest {
jobConfigManager = new PipelineJobConfigurationManager(jobType);
jobManager = new PipelineJobManager(jobType);
transmissionJobManager = new TransmissionJobManager(jobType);
- jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+ jobItemManager = new
PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
String jdbcUrl =
"jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
databaseType = DatabaseTypeFactory.get(jdbcUrl);
Map<String, Object> props = new HashMap<>(3, 1F);