This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d8ddbdc0d87 Refactor pipeline job path in registry center (#20278)
d8ddbdc0d87 is described below

commit d8ddbdc0d87b02e3046276709ee143664ed10777
Author: Hongsheng Zhong <sand...@126.com>
AuthorDate: Thu Aug 18 22:25:36 2022 +0800

    Refactor pipeline job path in registry center (#20278)
    
    * Refactor pipeline job path in registry center
    
    * Compatible with ejob namespace
---
 .../data/pipeline/api/job/JobType.java             |  5 ++
 .../data/pipeline/core/api/PipelineAPIFactory.java |  6 +-
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  | 12 ++--
 .../core/api/impl/GovernanceRepositoryAPIImpl.java | 12 ++--
 .../core/constant/DataPipelineConstants.java       |  3 +-
 .../pipeline/core/execute/PipelineJobExecutor.java | 10 +--
 .../core/metadata/node/PipelineMetaDataNode.java   | 80 +++++++++++++---------
 .../pipeline/scenario/migration/MigrationJob.java  |  4 +-
 .../metadata/node/PipelineMetaDataNodeTest.java    | 44 +++++++++---
 9 files changed, 108 insertions(+), 68 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
index 174550b90fe..553e720c936 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.api.job;
 
 import com.google.common.base.Preconditions;
 import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.Arrays;
 import java.util.Map;
@@ -40,10 +41,14 @@ public enum JobType {
     
     private final String typeName;
     
+    private final String lowercaseTypeName;
+    
     private final String typeCode;
     
     JobType(final String typeName, final String typeCode) {
+        Preconditions.checkArgument(StringUtils.isAlpha(typeName), "type name 
must be character of [a-z]");
         this.typeName = typeName;
+        this.lowercaseTypeName = typeName.toLowerCase();
         Preconditions.checkArgument(typeCode.length() == 2, "code length is 
not 2");
         this.typeCode = typeCode;
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
index b9438115173..52972546a8d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
@@ -25,8 +25,8 @@ import 
org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import 
org.apache.shardingsphere.data.pipeline.core.api.impl.GovernanceRepositoryAPIImpl;
-import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.registry.CoordinatorRegistryCenterInitializer;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
 import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
@@ -126,7 +126,7 @@ public final class PipelineAPIFactory {
         
         private ElasticJobAPIHolder() {
             ClusterPersistRepositoryConfiguration repositoryConfig = 
(ClusterPersistRepositoryConfiguration) 
PipelineContext.getModeConfig().getRepository();
-            String namespace = repositoryConfig.getNamespace() + 
DataPipelineConstants.DATA_PIPELINE_ROOT;
+            String namespace = repositoryConfig.getNamespace() + 
PipelineMetaDataNode.getElasticJobNamespace();
             jobStatisticsAPI = 
JobAPIFactory.createJobStatisticsAPI(repositoryConfig.getServerLists(), 
namespace, null);
             jobConfigurationAPI = 
JobAPIFactory.createJobConfigurationAPI(repositoryConfig.getServerLists(), 
namespace, null);
             jobOperateAPI = 
JobAPIFactory.createJobOperateAPI(repositoryConfig.getServerLists(), namespace, 
null);
@@ -162,7 +162,7 @@ public final class PipelineAPIFactory {
         private static CoordinatorRegistryCenter createRegistryCenter() {
             CoordinatorRegistryCenterInitializer registryCenterInitializer = 
new CoordinatorRegistryCenterInitializer();
             ModeConfiguration modeConfig = PipelineContext.getModeConfig();
-            return registryCenterInitializer.createRegistryCenter(modeConfig, 
DataPipelineConstants.DATA_PIPELINE_ROOT);
+            return registryCenterInitializer.createRegistryCenter(modeConfig, 
PipelineMetaDataNode.getElasticJobNamespace());
         }
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index a9d8f53cfb1..e07fb3035d6 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -65,12 +65,12 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
         }
         log.info("Start job by {}", jobConfig);
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
-        String jobConfigKey = 
PipelineMetaDataNode.getScalingJobConfigPath(jobId);
+        String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
         if (repositoryAPI.isExisted(jobConfigKey)) {
             log.warn("jobId already exists in registry center, ignore, 
jobConfigKey={}", jobConfigKey);
             return Optional.of(jobId);
         }
-        repositoryAPI.persist(PipelineMetaDataNode.getScalingJobPath(jobId), 
MigrationJob.class.getName());
+        repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
MigrationJob.class.getName());
         repositoryAPI.persist(jobConfigKey, 
convertJobConfigurationToText(jobConfig));
         return Optional.of(jobId);
     }
@@ -89,7 +89,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     @Override
     public void startDisabledJob(final String jobId) {
         log.info("Start disabled pipeline job {}", jobId);
-        
pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getScalingJobBarrierDisablePath(jobId));
+        
pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         if (!jobConfigPOJO.isDisabled()) {
             throw new PipelineVerifyFailedException("Job is already started.");
@@ -97,7 +97,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
         jobConfigPOJO.setDisabled(false);
         jobConfigPOJO.getProps().remove("stop_time");
         
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
-        String barrierPath = 
PipelineMetaDataNode.getScalingJobBarrierEnablePath(jobId);
+        String barrierPath = 
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
         pipelineDistributedBarrier.register(barrierPath, 
jobConfigPOJO.getShardingTotalCount());
         pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
     }
@@ -105,13 +105,13 @@ public abstract class AbstractPipelineJobAPIImpl 
implements PipelineJobAPI {
     @Override
     public void stop(final String jobId) {
         log.info("Stop pipeline job {}", jobId);
-        
pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getScalingJobBarrierEnablePath(jobId));
+        
pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         jobConfigPOJO.setDisabled(true);
         jobConfigPOJO.getProps().setProperty("stop_time", 
LocalDateTime.now().format(DATE_TIME_FORMATTER));
         // TODO updateJobConfiguration might doesn't work
         
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
-        String barrierPath = 
PipelineMetaDataNode.getScalingJobBarrierDisablePath(jobId);
+        String barrierPath = 
PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
         pipelineDistributedBarrier.register(barrierPath, 
jobConfigPOJO.getShardingTotalCount());
         pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index c58c65e24f9..62af844a34a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -45,30 +45,30 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     
     @Override
     public void persistJobItemProgress(final String jobId, final int 
shardingItem, final String progressValue) {
-        repository.persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobId, 
shardingItem), progressValue);
+        repository.persist(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 
shardingItem), progressValue);
     }
     
     @Override
     public String getJobItemProgress(final String jobId, final int 
shardingItem) {
-        return 
repository.get(PipelineMetaDataNode.getScalingJobOffsetPath(jobId, 
shardingItem));
+        return repository.get(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 
shardingItem));
     }
     
     @Override
     public void persistJobCheckResult(final String jobId, final boolean 
checkSuccess) {
         log.info("persist job check result '{}' for job {}", checkSuccess, 
jobId);
-        
repository.persist(PipelineMetaDataNode.getScalingCheckResultPath(jobId), 
String.valueOf(checkSuccess));
+        repository.persist(PipelineMetaDataNode.getJobCheckResultPath(jobId), 
String.valueOf(checkSuccess));
     }
     
     @Override
     public Optional<Boolean> getJobCheckResult(final String jobId) {
-        String data = 
repository.get(PipelineMetaDataNode.getScalingCheckResultPath(jobId));
+        String data = 
repository.get(PipelineMetaDataNode.getJobCheckResultPath(jobId));
         return Strings.isNullOrEmpty(data) ? Optional.empty() : 
Optional.of(Boolean.parseBoolean(data));
     }
     
     @Override
     public void deleteJob(final String jobId) {
         log.info("delete job {}", jobId);
-        repository.delete(PipelineMetaDataNode.getScalingJobPath(jobId));
+        repository.delete(PipelineMetaDataNode.getJobRootPath(jobId));
     }
     
     @Override
@@ -88,7 +88,7 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     
     @Override
     public List<Integer> getShardingItems(final String jobId) {
-        List<String> result = 
getChildrenKeys(PipelineMetaDataNode.getScalingJobOffsetPath(jobId));
+        List<String> result = 
getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
         log.info("getShardingItems, jobId={}, offsetKeys={}", jobId, result);
         return 
result.stream().map(Integer::parseInt).collect(Collectors.toList());
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
index 8a7e550bf7d..0caf3b592a6 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
@@ -29,8 +29,7 @@ public final class DataPipelineConstants {
     /**
      * Data pipeline node name.
      */
-    // TODO change to pipeline after job configuration structure completed
-    public static final String DATA_PIPELINE_NODE_NAME = "scaling";
+    public static final String DATA_PIPELINE_NODE_NAME = "pipeline";
     
     /**
      * Data pipeline root path.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index b8d93725514..3427ccc3d1a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
@@ -38,7 +39,6 @@ import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.regex.Pattern;
 
 /**
  * Pipeline job executor.
@@ -46,16 +46,12 @@ import java.util.regex.Pattern;
 @Slf4j
 public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
     
-    private static final Pattern CONFIG_PATTERN = 
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/(j\\d{2}[0-9a-f]+)/config");
-    
-    private static final Pattern BARRIER_MATCH_PATTERN = 
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/(j\\d{2}[0-9a-f]+)/barrier/(enable|disable)/\\d+");
-    
     private final ExecutorService executor = Executors.newFixedThreadPool(20);
     
     @Override
     protected void doStart() {
         
PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT,
 event -> {
-            if (BARRIER_MATCH_PATTERN.matcher(event.getKey()).matches() && 
event.getType() == Type.ADDED) {
+            if 
(PipelineMetaDataNode.BARRIER_PATTERN.matcher(event.getKey()).matches() && 
event.getType() == Type.ADDED) {
                 
PipelineDistributedBarrier.getInstance().checkChildrenNodeCount(event);
             }
             getJobConfigPOJO(event).ifPresent(optional -> processEvent(event, 
optional));
@@ -64,7 +60,7 @@ public final class PipelineJobExecutor extends 
AbstractLifecycleExecutor {
     
     private Optional<JobConfigurationPOJO> getJobConfigPOJO(final 
DataChangedEvent event) {
         try {
-            if (CONFIG_PATTERN.matcher(event.getKey()).matches()) {
+            if 
(PipelineMetaDataNode.CONFIG_PATTERN.matcher(event.getKey()).matches()) {
                 log.info("{} job config: {}", event.getType(), event.getKey());
                 return Optional.of(YamlEngine.unmarshal(event.getValue(), 
JobConfigurationPOJO.class, true));
             }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 9e75d46ef13..25a1f47d35f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -21,90 +21,102 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 
+import java.util.regex.Pattern;
+
 /**
- * Scaling meta data node.
+ * Pipeline meta data node.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class PipelineMetaDataNode {
     
+    private static final String JOB_PATTERN_PREFIX = 
DataPipelineConstants.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}[0-9a-f]+)";
+    
+    public static final Pattern CONFIG_PATTERN = 
Pattern.compile(JOB_PATTERN_PREFIX + "/config");
+    
+    public static final Pattern BARRIER_PATTERN = 
Pattern.compile(JOB_PATTERN_PREFIX + "/barrier/(enable|disable)/\\d+");
+    
     /**
-     * Get job config path.
+     * Get ElasticJob namespace.
      *
-     * @param jobId job id.
-     * @return job config path.
+     * @return namespace
      */
-    public static String getJobConfigPath(final String jobId) {
-        return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
jobId, "config");
+    public static String getElasticJobNamespace() {
+        // ElasticJob will persist job to namespace
+        return getJobsPath();
+    }
+    
+    private static String getJobsPath() {
+        return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
"jobs");
     }
     
     /**
-     * Get scaling root path.
+     * Get job root path.
      *
      * @param jobId job id.
      * @return root path
      */
-    public static String getScalingJobPath(final String jobId) {
-        return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
jobId);
+    public static String getJobRootPath(final String jobId) {
+        return String.join("/", getJobsPath(), jobId);
     }
     
     /**
-     * Get scaling job offset path, include job id and sharding item.
+     * Get job offset item path.
      *
-     * @param jobId job id.
-     * @param shardingItem sharding item.
-     * @return job offset path.
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return job offset path
      */
-    public static String getScalingJobOffsetPath(final String jobId, final int 
shardingItem) {
-        return String.join("/", getScalingJobOffsetPath(jobId), 
Integer.toString(shardingItem));
+    public static String getJobOffsetItemPath(final String jobId, final int 
shardingItem) {
+        return String.join("/", getJobOffsetPath(jobId), 
Integer.toString(shardingItem));
     }
     
     /**
-     * Get scaling job offset path.
+     * Get job offset path.
      *
-     * @param jobId job id.
-     * @return job offset path.
+     * @param jobId job id
+     * @return job offset path
      */
-    public static String getScalingJobOffsetPath(final String jobId) {
-        return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
jobId, "offset");
+    public static String getJobOffsetPath(final String jobId) {
+        return String.join("/", getJobRootPath(jobId), "offset");
     }
     
     /**
-     * Get scaling job config path.
+     * Get job config path.
      *
      * @param jobId job id.
      * @return job config path.
      */
-    public static String getScalingJobConfigPath(final String jobId) {
-        return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
jobId, "config");
+    public static String getJobConfigPath(final String jobId) {
+        return String.join("/", getJobRootPath(jobId), "config");
     }
     
     /**
-     * Get scaling job config path.
+     * Get job check result path.
      *
      * @param jobId job id.
      * @return job config path.
      */
-    public static String getScalingCheckResultPath(final String jobId) {
-        return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
jobId, "check", "result");
+    public static String getJobCheckResultPath(final String jobId) {
+        return String.join("/", getJobRootPath(jobId), "check", "result");
     }
     
     /**
-     * Get scaling job barrier enable path.
+     * Get job barrier enable path.
      *
      * @param jobId job id
-     * @return job barrier path.
+     * @return job barrier enable path
      */
-    public static String getScalingJobBarrierEnablePath(final String jobId) {
-        return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
jobId, "barrier", "enable");
+    public static String getJobBarrierEnablePath(final String jobId) {
+        return String.join("/", getJobRootPath(jobId), "barrier", "enable");
     }
     
     /**
-     * Get scaling job barrier disable path.
+     * Get job barrier disable path.
      *
      * @param jobId job id
-     * @return job barrier path.
+     * @return job barrier disable path
      */
-    public static String getScalingJobBarrierDisablePath(final String jobId) {
-        return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
jobId, "barrier", "disable");
+    public static String getJobBarrierDisablePath(final String jobId) {
+        return String.join("/", getJobRootPath(jobId), "barrier", "disable");
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index f730c8f3758..b7bd288daac 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -76,7 +76,7 @@ public final class MigrationJob extends AbstractPipelineJob 
implements SimpleJob
         });
         getTasksRunnerMap().put(shardingItem, tasksRunner);
         
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), 
shardingItem);
-        
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getScalingJobBarrierEnablePath(getJobId()),
 shardingItem);
+        
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()),
 shardingItem);
     }
     
     private void prepare(final MigrationJobItemContext jobItemContext) {
@@ -110,7 +110,7 @@ public final class MigrationJob extends AbstractPipelineJob 
implements SimpleJob
             return;
         }
         log.info("stop tasks runner, jobId={}", getJobId());
-        String scalingJobBarrierDisablePath = 
PipelineMetaDataNode.getScalingJobBarrierDisablePath(getJobId());
+        String scalingJobBarrierDisablePath = 
PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
         for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
             each.stop();
             
pipelineDistributedBarrier.persistEphemeralChildrenNode(scalingJobBarrierDisablePath,
 each.getJobItemContext().getShardingItem());
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
index 387a86dfbf3..2619d560e31 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
@@ -24,21 +24,49 @@ import static org.junit.Assert.assertThat;
 
 public final class PipelineMetaDataNodeTest {
     
+    private final String jobId = "j01001";
+    
+    private final String jobsPath = "/pipeline/jobs";
+    
+    private final String jobRootPath = jobsPath + "/j01001";
+    
+    @Test
+    public void assertGetElasticJobNamespace() {
+        assertThat(PipelineMetaDataNode.getElasticJobNamespace(), 
is(jobsPath));
+    }
+    
+    @Test
+    public void assertGetJobRootPath() {
+        assertThat(PipelineMetaDataNode.getJobRootPath(jobId), 
is(jobRootPath));
+    }
+    
+    @Test
+    public void assertGetJobOffsetPath() {
+        assertThat(PipelineMetaDataNode.getJobOffsetPath(jobId), 
is(jobRootPath + "/offset"));
+    }
+    
+    @Test
+    public void assertGetJobOffsetItemPath() {
+        assertThat(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 0), 
is(jobRootPath + "/offset/0"));
+    }
+    
     @Test
     public void assertGetJobConfigPath() {
-        String actualOffsetPath = 
PipelineMetaDataNode.getScalingJobOffsetPath("0130317c30317c3054317c7368617264696e675f6462");
-        assertThat(actualOffsetPath, 
is("/scaling/0130317c30317c3054317c7368617264696e675f6462/offset"));
-        actualOffsetPath = 
PipelineMetaDataNode.getScalingJobOffsetPath("0130317c30317c3054317c7368617264696e675f6462",
 1);
-        assertThat(actualOffsetPath, 
is("/scaling/0130317c30317c3054317c7368617264696e675f6462/offset/1"));
+        assertThat(PipelineMetaDataNode.getJobConfigPath(jobId), 
is(jobRootPath + "/config"));
+    }
+    
+    @Test
+    public void assertGetCheckResultPath() {
+        assertThat(PipelineMetaDataNode.getJobCheckResultPath(jobId), 
is(jobRootPath + "/check/result"));
     }
     
     @Test
-    public void assertGetScalingJobConfigPath() {
-        
assertThat(PipelineMetaDataNode.getScalingJobConfigPath("0130317c30317c3054317c7368617264696e675f6462"),
 is("/scaling/0130317c30317c3054317c7368617264696e675f6462/config"));
+    public void assertGetJobBarrierEnablePath() {
+        assertThat(PipelineMetaDataNode.getJobBarrierEnablePath(jobId), 
is(jobRootPath + "/barrier/enable"));
     }
     
     @Test
-    public void assertGetScalingCheckResultPath() {
-        
assertThat(PipelineMetaDataNode.getScalingCheckResultPath("0130317c30317c3054317c7368617264696e675f6462"),
 is("/scaling/0130317c30317c3054317c7368617264696e675f6462/check/result"));
+    public void assertGetJobBarrierDisablePath() {
+        assertThat(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), 
is(jobRootPath + "/barrier/disable"));
     }
 }

Reply via email to