This is an automated email from the ASF dual-hosted git repository. zhaojinchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new d8ddbdc0d87 Refactor pipeline job path in registry center (#20278) d8ddbdc0d87 is described below commit d8ddbdc0d87b02e3046276709ee143664ed10777 Author: Hongsheng Zhong <sand...@126.com> AuthorDate: Thu Aug 18 22:25:36 2022 +0800 Refactor pipeline job path in registry center (#20278) * Refactor pipeline job path in registry center * Compatible with ejob namespace --- .../data/pipeline/api/job/JobType.java | 5 ++ .../data/pipeline/core/api/PipelineAPIFactory.java | 6 +- .../core/api/impl/AbstractPipelineJobAPIImpl.java | 12 ++-- .../core/api/impl/GovernanceRepositoryAPIImpl.java | 12 ++-- .../core/constant/DataPipelineConstants.java | 3 +- .../pipeline/core/execute/PipelineJobExecutor.java | 10 +-- .../core/metadata/node/PipelineMetaDataNode.java | 80 +++++++++++++--------- .../pipeline/scenario/migration/MigrationJob.java | 4 +- .../metadata/node/PipelineMetaDataNodeTest.java | 44 +++++++++--- 9 files changed, 108 insertions(+), 68 deletions(-) diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java index 174550b90fe..553e720c936 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java @@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.api.job; import com.google.common.base.Preconditions; import lombok.Getter; +import org.apache.commons.lang3.StringUtils; import java.util.Arrays; import java.util.Map; @@ -40,10 +41,14 @@ public enum JobType { private final String typeName; + private final String lowercaseTypeName; + private final String typeCode; JobType(final String typeName, final String typeCode) { + Preconditions.checkArgument(StringUtils.isAlpha(typeName), "type name must be character of [a-z]"); this.typeName = typeName; + this.lowercaseTypeName = typeName.toLowerCase(); Preconditions.checkArgument(typeCode.length() == 2, "code length is not 2"); this.typeCode = typeCode; } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java index b9438115173..52972546a8d 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java @@ -25,8 +25,8 @@ import org.apache.commons.lang3.concurrent.ConcurrentException; import org.apache.commons.lang3.concurrent.LazyInitializer; import org.apache.shardingsphere.data.pipeline.api.job.JobType; import org.apache.shardingsphere.data.pipeline.core.api.impl.GovernanceRepositoryAPIImpl; -import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants; import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext; +import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.core.registry.CoordinatorRegistryCenterInitializer; import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory; import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI; @@ -126,7 +126,7 @@ public final class PipelineAPIFactory { private ElasticJobAPIHolder() { ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) PipelineContext.getModeConfig().getRepository(); - String namespace = repositoryConfig.getNamespace() + DataPipelineConstants.DATA_PIPELINE_ROOT; + String namespace = repositoryConfig.getNamespace() + PipelineMetaDataNode.getElasticJobNamespace(); jobStatisticsAPI = JobAPIFactory.createJobStatisticsAPI(repositoryConfig.getServerLists(), namespace, null); jobConfigurationAPI = JobAPIFactory.createJobConfigurationAPI(repositoryConfig.getServerLists(), namespace, null); jobOperateAPI = JobAPIFactory.createJobOperateAPI(repositoryConfig.getServerLists(), namespace, null); @@ -162,7 +162,7 @@ public final class PipelineAPIFactory { private static CoordinatorRegistryCenter createRegistryCenter() { CoordinatorRegistryCenterInitializer registryCenterInitializer = new CoordinatorRegistryCenterInitializer(); ModeConfiguration modeConfig = PipelineContext.getModeConfig(); - return registryCenterInitializer.createRegistryCenter(modeConfig, DataPipelineConstants.DATA_PIPELINE_ROOT); + return registryCenterInitializer.createRegistryCenter(modeConfig, PipelineMetaDataNode.getElasticJobNamespace()); } } } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java index a9d8f53cfb1..e07fb3035d6 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java @@ -65,12 +65,12 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI { } log.info("Start job by {}", jobConfig); GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(); - String jobConfigKey = PipelineMetaDataNode.getScalingJobConfigPath(jobId); + String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId); if (repositoryAPI.isExisted(jobConfigKey)) { log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", jobConfigKey); return Optional.of(jobId); } - repositoryAPI.persist(PipelineMetaDataNode.getScalingJobPath(jobId), MigrationJob.class.getName()); + repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), MigrationJob.class.getName()); repositoryAPI.persist(jobConfigKey, convertJobConfigurationToText(jobConfig)); return Optional.of(jobId); } @@ -89,7 +89,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI { @Override public void startDisabledJob(final String jobId) { log.info("Start disabled pipeline job {}", jobId); - pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getScalingJobBarrierDisablePath(jobId)); + pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId)); JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId); if (!jobConfigPOJO.isDisabled()) { throw new PipelineVerifyFailedException("Job is already started."); @@ -97,7 +97,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI { jobConfigPOJO.setDisabled(false); jobConfigPOJO.getProps().remove("stop_time"); PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO); - String barrierPath = PipelineMetaDataNode.getScalingJobBarrierEnablePath(jobId); + String barrierPath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId); pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount()); pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS); } @@ -105,13 +105,13 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI { @Override public void stop(final String jobId) { log.info("Stop pipeline job {}", jobId); - pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getScalingJobBarrierEnablePath(jobId)); + pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getJobBarrierEnablePath(jobId)); JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId); jobConfigPOJO.setDisabled(true); jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DATE_TIME_FORMATTER)); // TODO updateJobConfiguration might doesn't work PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO); - String barrierPath = PipelineMetaDataNode.getScalingJobBarrierDisablePath(jobId); + String barrierPath = PipelineMetaDataNode.getJobBarrierDisablePath(jobId); pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount()); pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS); } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java index c58c65e24f9..62af844a34a 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java @@ -45,30 +45,30 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP @Override public void persistJobItemProgress(final String jobId, final int shardingItem, final String progressValue) { - repository.persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobId, shardingItem), progressValue); + repository.persist(PipelineMetaDataNode.getJobOffsetItemPath(jobId, shardingItem), progressValue); } @Override public String getJobItemProgress(final String jobId, final int shardingItem) { - return repository.get(PipelineMetaDataNode.getScalingJobOffsetPath(jobId, shardingItem)); + return repository.get(PipelineMetaDataNode.getJobOffsetItemPath(jobId, shardingItem)); } @Override public void persistJobCheckResult(final String jobId, final boolean checkSuccess) { log.info("persist job check result '{}' for job {}", checkSuccess, jobId); - repository.persist(PipelineMetaDataNode.getScalingCheckResultPath(jobId), String.valueOf(checkSuccess)); + repository.persist(PipelineMetaDataNode.getJobCheckResultPath(jobId), String.valueOf(checkSuccess)); } @Override public Optional<Boolean> getJobCheckResult(final String jobId) { - String data = repository.get(PipelineMetaDataNode.getScalingCheckResultPath(jobId)); + String data = repository.get(PipelineMetaDataNode.getJobCheckResultPath(jobId)); return Strings.isNullOrEmpty(data) ? Optional.empty() : Optional.of(Boolean.parseBoolean(data)); } @Override public void deleteJob(final String jobId) { log.info("delete job {}", jobId); - repository.delete(PipelineMetaDataNode.getScalingJobPath(jobId)); + repository.delete(PipelineMetaDataNode.getJobRootPath(jobId)); } @Override @@ -88,7 +88,7 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP @Override public List<Integer> getShardingItems(final String jobId) { - List<String> result = getChildrenKeys(PipelineMetaDataNode.getScalingJobOffsetPath(jobId)); + List<String> result = getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId)); log.info("getShardingItems, jobId={}, offsetKeys={}", jobId, result); return result.stream().map(Integer::parseInt).collect(Collectors.toList()); } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java index 8a7e550bf7d..0caf3b592a6 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java @@ -29,8 +29,7 @@ public final class DataPipelineConstants { /** * Data pipeline node name. */ - // TODO change to pipeline after job configuration structure completed - public static final String DATA_PIPELINE_NODE_NAME = "scaling"; + public static final String DATA_PIPELINE_NODE_NAME = "pipeline"; /** * Data pipeline root path. diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java index b8d93725514..3427ccc3d1a 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java @@ -25,6 +25,7 @@ import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector; +import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory; @@ -38,7 +39,6 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.regex.Pattern; /** * Pipeline job executor. @@ -46,16 +46,12 @@ import java.util.regex.Pattern; @Slf4j public final class PipelineJobExecutor extends AbstractLifecycleExecutor { - private static final Pattern CONFIG_PATTERN = Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + "/(j\\d{2}[0-9a-f]+)/config"); - - private static final Pattern BARRIER_MATCH_PATTERN = Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + "/(j\\d{2}[0-9a-f]+)/barrier/(enable|disable)/\\d+"); - private final ExecutorService executor = Executors.newFixedThreadPool(20); @Override protected void doStart() { PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT, event -> { - if (BARRIER_MATCH_PATTERN.matcher(event.getKey()).matches() && event.getType() == Type.ADDED) { + if (PipelineMetaDataNode.BARRIER_PATTERN.matcher(event.getKey()).matches() && event.getType() == Type.ADDED) { PipelineDistributedBarrier.getInstance().checkChildrenNodeCount(event); } getJobConfigPOJO(event).ifPresent(optional -> processEvent(event, optional)); @@ -64,7 +60,7 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor { private Optional<JobConfigurationPOJO> getJobConfigPOJO(final DataChangedEvent event) { try { - if (CONFIG_PATTERN.matcher(event.getKey()).matches()) { + if (PipelineMetaDataNode.CONFIG_PATTERN.matcher(event.getKey()).matches()) { log.info("{} job config: {}", event.getType(), event.getKey()); return Optional.of(YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class, true)); } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java index 9e75d46ef13..25a1f47d35f 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java @@ -21,90 +21,102 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants; +import java.util.regex.Pattern; + /** - * Scaling meta data node. + * Pipeline meta data node. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class PipelineMetaDataNode { + private static final String JOB_PATTERN_PREFIX = DataPipelineConstants.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}[0-9a-f]+)"; + + public static final Pattern CONFIG_PATTERN = Pattern.compile(JOB_PATTERN_PREFIX + "/config"); + + public static final Pattern BARRIER_PATTERN = Pattern.compile(JOB_PATTERN_PREFIX + "/barrier/(enable|disable)/\\d+"); + /** - * Get job config path. + * Get ElasticJob namespace. * - * @param jobId job id. - * @return job config path. + * @return namespace */ - public static String getJobConfigPath(final String jobId) { - return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId, "config"); + public static String getElasticJobNamespace() { + // ElasticJob will persist job to namespace + return getJobsPath(); + } + + private static String getJobsPath() { + return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, "jobs"); } /** - * Get scaling root path. + * Get job root path. * * @param jobId job id. * @return root path */ - public static String getScalingJobPath(final String jobId) { - return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId); + public static String getJobRootPath(final String jobId) { + return String.join("/", getJobsPath(), jobId); } /** - * Get scaling job offset path, include job id and sharding item. + * Get job offset item path. * - * @param jobId job id. - * @param shardingItem sharding item. - * @return job offset path. + * @param jobId job id + * @param shardingItem sharding item + * @return job offset path */ - public static String getScalingJobOffsetPath(final String jobId, final int shardingItem) { - return String.join("/", getScalingJobOffsetPath(jobId), Integer.toString(shardingItem)); + public static String getJobOffsetItemPath(final String jobId, final int shardingItem) { + return String.join("/", getJobOffsetPath(jobId), Integer.toString(shardingItem)); } /** - * Get scaling job offset path. + * Get job offset path. * - * @param jobId job id. - * @return job offset path. + * @param jobId job id + * @return job offset path */ - public static String getScalingJobOffsetPath(final String jobId) { - return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId, "offset"); + public static String getJobOffsetPath(final String jobId) { + return String.join("/", getJobRootPath(jobId), "offset"); } /** - * Get scaling job config path. + * Get job config path. * * @param jobId job id. * @return job config path. */ - public static String getScalingJobConfigPath(final String jobId) { - return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId, "config"); + public static String getJobConfigPath(final String jobId) { + return String.join("/", getJobRootPath(jobId), "config"); } /** - * Get scaling job config path. + * Get job check result path. * * @param jobId job id. * @return job config path. */ - public static String getScalingCheckResultPath(final String jobId) { - return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId, "check", "result"); + public static String getJobCheckResultPath(final String jobId) { + return String.join("/", getJobRootPath(jobId), "check", "result"); } /** - * Get scaling job barrier enable path. + * Get job barrier enable path. * * @param jobId job id - * @return job barrier path. + * @return job barrier enable path */ - public static String getScalingJobBarrierEnablePath(final String jobId) { - return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId, "barrier", "enable"); + public static String getJobBarrierEnablePath(final String jobId) { + return String.join("/", getJobRootPath(jobId), "barrier", "enable"); } /** - * Get scaling job barrier disable path. + * Get job barrier disable path. * * @param jobId job id - * @return job barrier path. + * @return job barrier disable path */ - public static String getScalingJobBarrierDisablePath(final String jobId) { - return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId, "barrier", "disable"); + public static String getJobBarrierDisablePath(final String jobId) { + return String.join("/", getJobRootPath(jobId), "barrier", "disable"); } } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java index f730c8f3758..b7bd288daac 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java @@ -76,7 +76,7 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob }); getTasksRunnerMap().put(shardingItem, tasksRunner); PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), shardingItem); - pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getScalingJobBarrierEnablePath(getJobId()), shardingItem); + pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()), shardingItem); } private void prepare(final MigrationJobItemContext jobItemContext) { @@ -110,7 +110,7 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob return; } log.info("stop tasks runner, jobId={}", getJobId()); - String scalingJobBarrierDisablePath = PipelineMetaDataNode.getScalingJobBarrierDisablePath(getJobId()); + String scalingJobBarrierDisablePath = PipelineMetaDataNode.getJobBarrierDisablePath(getJobId()); for (PipelineTasksRunner each : getTasksRunnerMap().values()) { each.stop(); pipelineDistributedBarrier.persistEphemeralChildrenNode(scalingJobBarrierDisablePath, each.getJobItemContext().getShardingItem()); diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java index 387a86dfbf3..2619d560e31 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java @@ -24,21 +24,49 @@ import static org.junit.Assert.assertThat; public final class PipelineMetaDataNodeTest { + private final String jobId = "j01001"; + + private final String jobsPath = "/pipeline/jobs"; + + private final String jobRootPath = jobsPath + "/j01001"; + + @Test + public void assertGetElasticJobNamespace() { + assertThat(PipelineMetaDataNode.getElasticJobNamespace(), is(jobsPath)); + } + + @Test + public void assertGetJobRootPath() { + assertThat(PipelineMetaDataNode.getJobRootPath(jobId), is(jobRootPath)); + } + + @Test + public void assertGetJobOffsetPath() { + assertThat(PipelineMetaDataNode.getJobOffsetPath(jobId), is(jobRootPath + "/offset")); + } + + @Test + public void assertGetJobOffsetItemPath() { + assertThat(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 0), is(jobRootPath + "/offset/0")); + } + @Test public void assertGetJobConfigPath() { - String actualOffsetPath = PipelineMetaDataNode.getScalingJobOffsetPath("0130317c30317c3054317c7368617264696e675f6462"); - assertThat(actualOffsetPath, is("/scaling/0130317c30317c3054317c7368617264696e675f6462/offset")); - actualOffsetPath = PipelineMetaDataNode.getScalingJobOffsetPath("0130317c30317c3054317c7368617264696e675f6462", 1); - assertThat(actualOffsetPath, is("/scaling/0130317c30317c3054317c7368617264696e675f6462/offset/1")); + assertThat(PipelineMetaDataNode.getJobConfigPath(jobId), is(jobRootPath + "/config")); + } + + @Test + public void assertGetCheckResultPath() { + assertThat(PipelineMetaDataNode.getJobCheckResultPath(jobId), is(jobRootPath + "/check/result")); } @Test - public void assertGetScalingJobConfigPath() { - assertThat(PipelineMetaDataNode.getScalingJobConfigPath("0130317c30317c3054317c7368617264696e675f6462"), is("/scaling/0130317c30317c3054317c7368617264696e675f6462/config")); + public void assertGetJobBarrierEnablePath() { + assertThat(PipelineMetaDataNode.getJobBarrierEnablePath(jobId), is(jobRootPath + "/barrier/enable")); } @Test - public void assertGetScalingCheckResultPath() { - assertThat(PipelineMetaDataNode.getScalingCheckResultPath("0130317c30317c3054317c7368617264696e675f6462"), is("/scaling/0130317c30317c3054317c7368617264696e675f6462/check/result")); + public void assertGetJobBarrierDisablePath() { + assertThat(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), is(jobRootPath + "/barrier/disable")); } }