sandynz commented on code in PR #20222:
URL: https://github.com/apache/shardingsphere/pull/20222#discussion_r946698956


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java:
##########
@@ -44,6 +44,9 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     @Setter
     private OneOffJobBootstrap oneOffJobBootstrap;
     
+    @Setter
+    private int shardingItem;

Review Comment:
   It's better not add `shardingItem` field in job class, since there might be 
severl sharding items in a job.



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java:
##########
@@ -46,11 +48,18 @@ public final class PipelineJobExecutor extends 
AbstractLifecycleExecutor {
     
     private static final Pattern CONFIG_PATTERN = 
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/(j\\d{2}[0-9a-f]+)/config");
     
+    private static final Pattern BARRIER_MATCH_PATTERN = 
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/(j\\d{2}[0-9a-f]+)/barrier/(enable|distable)/\\d+");

Review Comment:
   Seems `distable` is mis-spelled



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java:
##########
@@ -85,23 +87,31 @@ private String convertJobConfigurationToText(final 
PipelineJobConfiguration jobC
     @Override
     public void startDisabledJob(final String jobId) {
         log.info("Start disabled pipeline job {}", jobId);
+        
MigrationDistributedCountDownLatch.getInstance().removeParentNode(PipelineMetaDataNode.getScalingJobBarrierDisablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         if (!jobConfigPOJO.isDisabled()) {
             throw new PipelineVerifyFailedException("Job is already started.");
         }
         jobConfigPOJO.setDisabled(false);
         jobConfigPOJO.getProps().remove("stop_time");
         
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
+        String barrierPath = 
PipelineMetaDataNode.getScalingJobBarrierEnablePath(jobId);
+        MigrationDistributedCountDownLatch.getInstance().register(barrierPath, 
jobConfigPOJO.getShardingTotalCount());
+        MigrationDistributedCountDownLatch.getInstance().await(barrierPath, 
10, TimeUnit.SECONDS);

Review Comment:
   Is there any resources need to be cleaned when timeout?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/MigrationDistributedCountDownLatch.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+@Slf4j
+public final class MigrationDistributedCountDownLatch {
+    
+    private static final MigrationDistributedCountDownLatch INSTANCE = new 
MigrationDistributedCountDownLatch();
+    
+    private static final Pattern BARRIER_MATCH_PATTERN = 
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/(j\\d{2}[0-9a-f]+)/barrier/(enable|distable)/\\d+");
+    
+    private final ClusterPersistRepository clusterPersistRepository;
+    
+    private final Map<String, InnerCountDownLatchHolder> countDownLatchMap = 
new ConcurrentHashMap<>();
+    
+    private MigrationDistributedCountDownLatch() {
+        clusterPersistRepository = (ClusterPersistRepository) 
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
+    }
+    
+    /**
+     * Get instance.
+     *
+     * @return instance
+     */
+    public static MigrationDistributedCountDownLatch getInstance() {
+        return INSTANCE;
+    }
+    
+    /**
+     * Register count down latch.
+     *
+     * @param parentPath parent path
+     * @param totalCount total count
+     */
+    public void register(final String parentPath, final int totalCount) {
+        clusterPersistRepository.persist(parentPath, "");
+        countDownLatchMap.computeIfAbsent(parentPath, k -> new 
InnerCountDownLatchHolder(totalCount, new CountDownLatch(1)));
+    }
+    
+    /**
+     * Persist ephemeral children node.
+     *
+     * @param parentPath parent path
+     * @param shardingItem sharding item
+     */
+    public void persistEphemeralChildrenNode(final String parentPath, final 
int shardingItem) {
+        String key = String.join("/", parentPath, 
Integer.toString(shardingItem));
+        clusterPersistRepository.delete(key);
+        clusterPersistRepository.persistEphemeral(key, "");
+    }
+    
+    /**
+     * Persist ephemeral children node.
+     *
+     * @param parentPath parent path
+     */
+    public void removeParentNode(final String parentPath) {
+        clusterPersistRepository.delete(String.join("/", parentPath));
+        countDownLatchMap.remove(parentPath);
+    }
+    
+    /**
+     * Await unitl all children node is ready.
+     *
+     * @param parentPath parent path
+     * @param timeout timeout
+     * @param timeUnit time unit
+     * @return
+     * true if the count reached zero and false if the waiting time elapsed 
before the count reached zero
+     */
+    public boolean await(final String parentPath, final long timeout, final 
TimeUnit timeUnit) {
+        InnerCountDownLatchHolder holder = countDownLatchMap.get(parentPath);
+        if (holder == null) {
+            return false;
+        }
+        try {
+            boolean awaitResult = holder.getCountDownLatch().await(timeout, 
timeUnit);
+            if (!awaitResult) {
+                log.info("await timeout, parent path: {}, timeout: {}, time 
unit: {}", parentPath, timeout, timeUnit);
+            }
+            return awaitResult;
+        } catch (final InterruptedException ignored) {
+        }
+        return false;
+    }
+    
+    /**
+     * Check child node count equal shardingCount.
+     *
+     * @param event event
+     */
+    public void checkChildrenNodeCount(final DataChangedEvent event) {
+        if (StringUtils.isBlank(event.getKey())) {
+            return;
+        }
+        if (!BARRIER_MATCH_PATTERN.matcher(event.getKey()).matches()) {
+            return;
+        }

Review Comment:
   BARRIER_MATCH_PATTERN exists in job executor, could we remove it here? It 
need to maintain 2 copy of pattern, it's error prone.



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/MigrationDistributedCountDownLatch.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+@Slf4j
+public final class MigrationDistributedCountDownLatch {

Review Comment:
   1, Class javadoc is required
   
   2, Could we rename the class for common usage? There might be more types of 
jobs
   



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java:
##########
@@ -85,23 +87,31 @@ private String convertJobConfigurationToText(final 
PipelineJobConfiguration jobC
     @Override
     public void startDisabledJob(final String jobId) {
         log.info("Start disabled pipeline job {}", jobId);
+        
MigrationDistributedCountDownLatch.getInstance().removeParentNode(PipelineMetaDataNode.getScalingJobBarrierDisablePath(jobId));

Review Comment:
   Could we extract `getInstance()` in variable and be shared in method? And 
also in stop method.



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java:
##########
@@ -85,23 +87,31 @@ private String convertJobConfigurationToText(final 
PipelineJobConfiguration jobC
     @Override
     public void startDisabledJob(final String jobId) {
         log.info("Start disabled pipeline job {}", jobId);
+        
MigrationDistributedCountDownLatch.getInstance().removeParentNode(PipelineMetaDataNode.getScalingJobBarrierDisablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         if (!jobConfigPOJO.isDisabled()) {
             throw new PipelineVerifyFailedException("Job is already started.");
         }
         jobConfigPOJO.setDisabled(false);
         jobConfigPOJO.getProps().remove("stop_time");
         
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
+        String barrierPath = 
PipelineMetaDataNode.getScalingJobBarrierEnablePath(jobId);
+        MigrationDistributedCountDownLatch.getInstance().register(barrierPath, 
jobConfigPOJO.getShardingTotalCount());
+        MigrationDistributedCountDownLatch.getInstance().await(barrierPath, 
10, TimeUnit.SECONDS);

Review Comment:
   Could we wait on a shorter time? e.g. 3 seconds or 5 seconds?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/MigrationDistributedCountDownLatch.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+@Slf4j
+public final class MigrationDistributedCountDownLatch {
+    
+    private static final MigrationDistributedCountDownLatch INSTANCE = new 
MigrationDistributedCountDownLatch();
+    
+    private static final Pattern BARRIER_MATCH_PATTERN = 
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/(j\\d{2}[0-9a-f]+)/barrier/(enable|distable)/\\d+");
+    
+    private final ClusterPersistRepository clusterPersistRepository;
+    
+    private final Map<String, InnerCountDownLatchHolder> countDownLatchMap = 
new ConcurrentHashMap<>();
+    
+    private MigrationDistributedCountDownLatch() {
+        clusterPersistRepository = (ClusterPersistRepository) 
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
+    }
+    
+    /**
+     * Get instance.
+     *
+     * @return instance
+     */
+    public static MigrationDistributedCountDownLatch getInstance() {
+        return INSTANCE;
+    }
+    
+    /**
+     * Register count down latch.
+     *
+     * @param parentPath parent path
+     * @param totalCount total count
+     */
+    public void register(final String parentPath, final int totalCount) {
+        clusterPersistRepository.persist(parentPath, "");
+        countDownLatchMap.computeIfAbsent(parentPath, k -> new 
InnerCountDownLatchHolder(totalCount, new CountDownLatch(1)));
+    }
+    
+    /**
+     * Persist ephemeral children node.
+     *
+     * @param parentPath parent path
+     * @param shardingItem sharding item
+     */
+    public void persistEphemeralChildrenNode(final String parentPath, final 
int shardingItem) {
+        String key = String.join("/", parentPath, 
Integer.toString(shardingItem));
+        clusterPersistRepository.delete(key);
+        clusterPersistRepository.persistEphemeral(key, "");
+    }
+    
+    /**
+     * Persist ephemeral children node.
+     *
+     * @param parentPath parent path
+     */
+    public void removeParentNode(final String parentPath) {
+        clusterPersistRepository.delete(String.join("/", parentPath));
+        countDownLatchMap.remove(parentPath);
+    }
+    
+    /**
+     * Await unitl all children node is ready.
+     *
+     * @param parentPath parent path
+     * @param timeout timeout
+     * @param timeUnit time unit
+     * @return
+     * true if the count reached zero and false if the waiting time elapsed 
before the count reached zero
+     */
+    public boolean await(final String parentPath, final long timeout, final 
TimeUnit timeUnit) {
+        InnerCountDownLatchHolder holder = countDownLatchMap.get(parentPath);
+        if (holder == null) {
+            return false;
+        }
+        try {
+            boolean awaitResult = holder.getCountDownLatch().await(timeout, 
timeUnit);

Review Comment:
   `awaitResult` could be `result`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/MigrationDistributedCountDownLatch.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+@Slf4j
+public final class MigrationDistributedCountDownLatch {
+    
+    private static final MigrationDistributedCountDownLatch INSTANCE = new 
MigrationDistributedCountDownLatch();
+    
+    private static final Pattern BARRIER_MATCH_PATTERN = 
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/(j\\d{2}[0-9a-f]+)/barrier/(enable|distable)/\\d+");
+    
+    private final ClusterPersistRepository clusterPersistRepository;
+    
+    private final Map<String, InnerCountDownLatchHolder> countDownLatchMap = 
new ConcurrentHashMap<>();
+    
+    private MigrationDistributedCountDownLatch() {
+        clusterPersistRepository = (ClusterPersistRepository) 
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
+    }
+    
+    /**
+     * Get instance.
+     *
+     * @return instance
+     */
+    public static MigrationDistributedCountDownLatch getInstance() {
+        return INSTANCE;
+    }
+    
+    /**
+     * Register count down latch.
+     *
+     * @param parentPath parent path
+     * @param totalCount total count
+     */
+    public void register(final String parentPath, final int totalCount) {
+        clusterPersistRepository.persist(parentPath, "");
+        countDownLatchMap.computeIfAbsent(parentPath, k -> new 
InnerCountDownLatchHolder(totalCount, new CountDownLatch(1)));
+    }
+    
+    /**
+     * Persist ephemeral children node.
+     *
+     * @param parentPath parent path
+     * @param shardingItem sharding item
+     */
+    public void persistEphemeralChildrenNode(final String parentPath, final 
int shardingItem) {
+        String key = String.join("/", parentPath, 
Integer.toString(shardingItem));
+        clusterPersistRepository.delete(key);
+        clusterPersistRepository.persistEphemeral(key, "");
+    }
+    
+    /**
+     * Persist ephemeral children node.
+     *
+     * @param parentPath parent path
+     */
+    public void removeParentNode(final String parentPath) {
+        clusterPersistRepository.delete(String.join("/", parentPath));
+        countDownLatchMap.remove(parentPath);
+    }
+    
+    /**
+     * Await unitl all children node is ready.
+     *
+     * @param parentPath parent path
+     * @param timeout timeout
+     * @param timeUnit time unit
+     * @return
+     * true if the count reached zero and false if the waiting time elapsed 
before the count reached zero
+     */
+    public boolean await(final String parentPath, final long timeout, final 
TimeUnit timeUnit) {
+        InnerCountDownLatchHolder holder = countDownLatchMap.get(parentPath);
+        if (holder == null) {
+            return false;
+        }

Review Comment:
   1, `holder == null` could be `null ==handler`
   
   2, It's better to log before return false
   



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java:
##########
@@ -97,6 +101,7 @@ private void prepare(final MigrationJobItemContext 
jobItemContext) {
     public void stop() {
         setStopping(true);
         dataSourceManager.close();
+        
MigrationDistributedCountDownLatch.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getScalingJobBarrierDisablePath(getJobId()),
 getShardingItem());

Review Comment:
   `getShardingItem()` doesn't match `stop()` method (it means stop job, not 
only one item)



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/MigrationDistributedCountDownLatch.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+@Slf4j
+public final class MigrationDistributedCountDownLatch {
+    
+    private static final MigrationDistributedCountDownLatch INSTANCE = new 
MigrationDistributedCountDownLatch();
+    
+    private static final Pattern BARRIER_MATCH_PATTERN = 
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/(j\\d{2}[0-9a-f]+)/barrier/(enable|distable)/\\d+");
+    
+    private final ClusterPersistRepository clusterPersistRepository;
+    

Review Comment:
   `clusterPersistRepository` could be `repository`, it might support other 
registry center later



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/MigrationDistributedCountDownLatch.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+@Slf4j
+public final class MigrationDistributedCountDownLatch {
+    
+    private static final MigrationDistributedCountDownLatch INSTANCE = new 
MigrationDistributedCountDownLatch();
+    
+    private static final Pattern BARRIER_MATCH_PATTERN = 
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/(j\\d{2}[0-9a-f]+)/barrier/(enable|distable)/\\d+");
+    
+    private final ClusterPersistRepository clusterPersistRepository;
+    
+    private final Map<String, InnerCountDownLatchHolder> countDownLatchMap = 
new ConcurrentHashMap<>();
+    
+    private MigrationDistributedCountDownLatch() {
+        clusterPersistRepository = (ClusterPersistRepository) 
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
+    }
+    
+    /**
+     * Get instance.
+     *
+     * @return instance
+     */
+    public static MigrationDistributedCountDownLatch getInstance() {
+        return INSTANCE;
+    }
+    
+    /**
+     * Register count down latch.
+     *
+     * @param parentPath parent path
+     * @param totalCount total count
+     */
+    public void register(final String parentPath, final int totalCount) {
+        clusterPersistRepository.persist(parentPath, "");
+        countDownLatchMap.computeIfAbsent(parentPath, k -> new 
InnerCountDownLatchHolder(totalCount, new CountDownLatch(1)));
+    }
+    
+    /**
+     * Persist ephemeral children node.
+     *
+     * @param parentPath parent path
+     * @param shardingItem sharding item
+     */
+    public void persistEphemeralChildrenNode(final String parentPath, final 
int shardingItem) {
+        String key = String.join("/", parentPath, 
Integer.toString(shardingItem));
+        clusterPersistRepository.delete(key);
+        clusterPersistRepository.persistEphemeral(key, "");
+    }
+    
+    /**
+     * Persist ephemeral children node.
+     *
+     * @param parentPath parent path
+     */
+    public void removeParentNode(final String parentPath) {
+        clusterPersistRepository.delete(String.join("/", parentPath));
+        countDownLatchMap.remove(parentPath);
+    }
+    
+    /**
+     * Await unitl all children node is ready.
+     *
+     * @param parentPath parent path
+     * @param timeout timeout
+     * @param timeUnit time unit
+     * @return
+     * true if the count reached zero and false if the waiting time elapsed 
before the count reached zero
+     */
+    public boolean await(final String parentPath, final long timeout, final 
TimeUnit timeUnit) {
+        InnerCountDownLatchHolder holder = countDownLatchMap.get(parentPath);
+        if (holder == null) {
+            return false;
+        }
+        try {
+            boolean awaitResult = holder.getCountDownLatch().await(timeout, 
timeUnit);
+            if (!awaitResult) {
+                log.info("await timeout, parent path: {}, timeout: {}, time 
unit: {}", parentPath, timeout, timeUnit);
+            }
+            return awaitResult;
+        } catch (final InterruptedException ignored) {
+        }
+        return false;
+    }
+    
+    /**
+     * Check child node count equal shardingCount.
+     *
+     * @param event event
+     */
+    public void checkChildrenNodeCount(final DataChangedEvent event) {
+        if (StringUtils.isBlank(event.getKey())) {
+            return;
+        }
+        if (!BARRIER_MATCH_PATTERN.matcher(event.getKey()).matches()) {
+            return;
+        }
+        String parentPath = event.getKey().substring(0, 
event.getKey().lastIndexOf("/"));
+        InnerCountDownLatchHolder holder = countDownLatchMap.get(parentPath);
+        if (holder == null) {
+            return;
+        }
+        List<String> childrenKeys = 
clusterPersistRepository.getChildrenKeys(parentPath);
+        if (childrenKeys.size() == holder.getTotalCount()) {
+            holder.getCountDownLatch().countDown();
+        }

Review Comment:
   1, It's better to print `childrenKeys` for better troubleshooting, and also 
total count
   
   2, And also when count matches
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to