This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 628ba8dfb2e Refactor pipeline job cancel and exception handling 
(#30335)
628ba8dfb2e is described below

commit 628ba8dfb2e172d68e43dd53363c10edba40b0d4
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Feb 28 15:42:55 2024 +0800

    Refactor pipeline job cancel and exception handling (#30335)
    
    * Add PipelineJobCanceledException; Refactor cancel()
    
    * Throw PipelineJobCanceledException when job is stopping
    
    * Rename PipelineJobCanceledException to PipelineJobCancelingException
    
    * Refactor job exception handling and resources release
    
    * Clean InventoryTaskExecuteCallback.onFailure
---
 .../table/MatchingTableInventoryChecker.java       | 30 ++++++++++--------
 .../RecordSingleTableInventoryCalculator.java      |  6 ++--
 .../exception/PipelineJobCancelingException.java   | 33 +++++++++++++++++++
 .../core/job/AbstractSeparablePipelineJob.java     | 30 ++++++++----------
 .../core/task/runner/TransmissionTasksRunner.java  | 37 +++++-----------------
 .../MigrationDataConsistencyChecker.java           | 18 +++++------
 .../migration/preparer/MigrationJobPreparer.java   | 14 ++++----
 7 files changed, 90 insertions(+), 78 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index 5e67b92a157..6e799b958da 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -19,30 +19,30 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
+import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 import java.sql.SQLException;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Objects;
-import java.util.Set;
+import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Matching table inventory checker.
@@ -53,7 +53,11 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
     
     private final TableInventoryCheckParameter param;
     
-    private final Set<SingleTableInventoryCalculator> calculators = new 
HashSet<>();
+    private final AtomicBoolean canceling = new AtomicBoolean(false);
+    
+    private volatile SingleTableInventoryCalculator sourceCalculator;
+    
+    private volatile SingleTableInventoryCalculator targetCalculator;
     
     @Override
     public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
@@ -73,9 +77,9 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
         SingleTableInventoryCalculateParameter targetParam = new 
SingleTableInventoryCalculateParameter(param.getTargetDataSource(), 
param.getTargetTable(),
                 param.getColumnNames(), param.getUniqueKeys(), 
param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().toString()));
         SingleTableInventoryCalculator sourceCalculator = 
buildSingleTableInventoryCalculator();
-        calculators.add(sourceCalculator);
+        this.sourceCalculator = sourceCalculator;
         SingleTableInventoryCalculator targetCalculator = 
buildSingleTableInventoryCalculator();
-        calculators.add(targetCalculator);
+        this.targetCalculator = targetCalculator;
         try {
             Iterator<SingleTableInventoryCalculatedResult> 
sourceCalculatedResults = waitFuture(executor.submit(() -> 
sourceCalculator.calculate(sourceParam))).iterator();
             Iterator<SingleTableInventoryCalculatedResult> 
targetCalculatedResults = waitFuture(executor.submit(() -> 
targetCalculator.calculate(targetParam))).iterator();
@@ -83,8 +87,8 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
         } finally {
             QuietlyCloser.close(sourceParam.getCalculationContext());
             QuietlyCloser.close(targetParam.getCalculationContext());
-            calculators.remove(sourceCalculator);
-            calculators.remove(targetCalculator);
+            this.sourceCalculator = null;
+            this.targetCalculator = null;
         }
     }
     
@@ -145,13 +149,13 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
     
     @Override
     public void cancel() {
-        for (SingleTableInventoryCalculator each : calculators) {
-            each.cancel();
-        }
+        canceling.set(true);
+        
Optional.ofNullable(sourceCalculator).ifPresent(SingleTableInventoryCalculator::cancel);
+        
Optional.ofNullable(targetCalculator).ifPresent(SingleTableInventoryCalculator::cancel);
     }
     
     @Override
     public boolean isCanceling() {
-        return 
calculators.stream().anyMatch(SingleTableInventoryCalculator::isCanceling);
+        return canceling.get();
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index 2e8519686ec..9efea4fc839 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.ColumnValueReaderEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
@@ -60,7 +61,8 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
             ResultSet resultSet = calculationContext.getResultSet();
             ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
             while (resultSet.next()) {
-                ShardingSpherePreconditions.checkState(!isCanceling(), () -> 
new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), 
param.getLogicTableName()));
+                ShardingSpherePreconditions.checkState(!isCanceling(), () -> 
new PipelineJobCancelingException(
+                        "Calculate chunk canceled, schema name: %s, table 
name: %s", param.getSchemaName(), param.getLogicTableName()));
                 Map<String, Object> columnRecord = new LinkedHashMap<>();
                 for (int columnIndex = 1, columnCount = 
resultSetMetaData.getColumnCount(); columnIndex <= columnCount; columnIndex++) {
                     
columnRecord.put(resultSetMetaData.getColumnLabel(columnIndex), 
columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex));
@@ -75,7 +77,7 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
                 calculationContext.close();
             }
             return records.isEmpty() ? Optional.empty() : Optional.of(new 
RecordSingleTableInventoryCalculatedResult(maxUniqueKeyValue, records));
-        } catch (final PipelineSQLException ex) {
+        } catch (final PipelineSQLException | PipelineJobCancelingException 
ex) {
             calculationContext.close();
             throw ex;
             // CHECKSTYLE:OFF
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineJobCancelingException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineJobCancelingException.java
new file mode 100644
index 00000000000..2fbc7daefbc
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineJobCancelingException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.exception;
+
+import lombok.NoArgsConstructor;
+
+/**
+ * Pipeline job canceling exception.
+ */
+@NoArgsConstructor
+public final class PipelineJobCancelingException extends RuntimeException {
+    
+    private static final long serialVersionUID = 1L;
+    
+    public PipelineJobCancelingException(final String errorMessage, final 
Object... args) {
+        super(String.format(errorMessage, args));
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 8d12a7f000b..855bd8ed717 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -22,7 +22,6 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
@@ -32,7 +31,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.Pipeline
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -86,27 +84,36 @@ public abstract class AbstractSeparablePipelineJob<T 
extends PipelineJobConfigur
         T jobConfig = jobConfigManager.getJobConfiguration(jobId);
         PipelineJobItemManager<P> jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
         P jobItemProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), 
shardingItem).orElse(null);
+        boolean started = false;
         try {
-            execute(buildJobItemContext(jobConfig, shardingItem, 
jobItemProgress, jobProcessContext));
+            started = execute(buildJobItemContext(jobConfig, shardingItem, 
jobItemProgress, jobProcessContext));
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
-            processFailed(jobId, shardingItem, ex);
-            throw ex;
+            if (!jobRunnerManager.isStopping()) {
+                log.error("Job execution failed, {}-{}", jobId, shardingItem, 
ex);
+                
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 shardingItem, ex);
+                throw ex;
+            }
+        } finally {
+            if (started) {
+                
jobRunnerManager.getTasksRunner(shardingItem).ifPresent(PipelineTasksRunner::stop);
+            }
         }
     }
     
-    private void execute(final I jobItemContext) {
+    private boolean execute(final I jobItemContext) {
         int shardingItem = jobItemContext.getShardingItem();
         PipelineTasksRunner tasksRunner = buildTasksRunner(jobItemContext);
         if (!jobRunnerManager.addTasksRunner(shardingItem, tasksRunner)) {
-            return;
+            return false;
         }
         String jobId = jobItemContext.getJobId();
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
 shardingItem);
         prepare(jobItemContext);
         log.info("Start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
         tasksRunner.start();
+        return true;
     }
     
     protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P 
jobItemProgress, TransmissionProcessContext jobProcessContext);
@@ -124,13 +131,4 @@ public abstract class AbstractSeparablePipelineJob<T 
extends PipelineJobConfigur
     }
     
     protected abstract void doPrepare(I jobItemContext) throws SQLException;
-    
-    private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
-        log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
-        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 shardingItem, ex);
-        try {
-            new 
PipelineJobManager(PipelineJobIdUtils.parseJobType(jobId)).stop(jobId);
-        } catch (final PipelineJobNotFoundException ignored) {
-        }
-    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
index d7e341f78f8..d869bd2a2ef 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
@@ -21,19 +21,18 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
@@ -87,7 +86,7 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
     @Override
     public void start() {
         if (jobItemContext.isStopping()) {
-            return;
+            throw new PipelineJobCancelingException();
         }
         new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
                 
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
@@ -118,8 +117,7 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
     
     private synchronized void executeIncrementalTask() {
         if (jobItemContext.isStopping()) {
-            log.info("Stopping is true, ignore incremental task");
-            return;
+            throw new PipelineJobCancelingException();
         }
         if (incrementalTasks.isEmpty()) {
             log.info("incrementalTasks empty, ignore");
@@ -150,30 +148,18 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
         }
     }
     
-    protected void inventoryFailureCallback(final Throwable throwable) {
-        log.error("onFailure, inventory task execute failed.", throwable);
-        String jobId = jobItemContext.getJobId();
-        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 jobItemContext.getShardingItem(), throwable);
-        try {
-            jobManager.stop(jobId);
-        } catch (final PipelineJobNotFoundException ignored) {
-        }
-    }
-    
     private final class InventoryTaskExecuteCallback implements 
ExecuteCallback {
         
         @Override
         public void onSuccess() {
             if (jobItemContext.isStopping()) {
-                log.info("Inventory task onSuccess, stopping true, ignore");
-                return;
+                throw new PipelineJobCancelingException();
             }
             inventorySuccessCallback();
         }
         
         @Override
-        public void onFailure(final Throwable throwable) {
-            inventoryFailureCallback(throwable);
+        public void onFailure(final Throwable ignored) {
         }
     }
     
@@ -185,14 +171,7 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
         }
         
         @Override
-        public void onFailure(final Throwable throwable) {
-            log.error("onFailure, incremental task execute failed.", 
throwable);
-            String jobId = jobItemContext.getJobId();
-            
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 jobItemContext.getShardingItem(), throwable);
-            try {
-                jobManager.stop(jobId);
-            } catch (final PipelineJobNotFoundException ignored) {
-            }
+        public void onFailure(final Throwable ignored) {
         }
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 7371a1e7b39..2d9a57595fd 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -35,7 +35,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTable
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
@@ -46,6 +45,7 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
 
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -53,7 +53,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -71,6 +73,8 @@ public final class MigrationDataConsistencyChecker implements 
PipelineDataConsis
     
     private final AtomicReference<TableInventoryChecker> 
currentTableInventoryChecker = new AtomicReference<>();
     
+    private final AtomicBoolean canceling = new AtomicBoolean(false);
+    
     public MigrationDataConsistencyChecker(final MigrationJobConfiguration 
jobConfig, final TransmissionProcessContext processContext,
                                            final 
ConsistencyCheckJobItemProgressContext progressContext) {
         this.jobConfig = jobConfig;
@@ -139,18 +143,12 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
     
     @Override
     public void cancel() {
-        TableInventoryChecker checker = currentTableInventoryChecker.get();
-        if (null != checker) {
-            checker.cancel();
-        }
+        canceling.set(true);
+        
Optional.ofNullable(currentTableInventoryChecker.get()).ifPresent(TableInventoryChecker::cancel);
     }
     
     @Override
     public boolean isCanceling() {
-        TableInventoryChecker checker = currentTableInventoryChecker.get();
-        if (null == checker) {
-            return false;
-        }
-        return checker.isCanceling();
+        return canceling.get();
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 67a864b71b5..817d64dc03d 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngin
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
@@ -35,7 +36,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.In
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
@@ -90,21 +90,20 @@ public final class MigrationJobPreparer {
      *
      * @param jobItemContext job item context
      * @throws SQLException SQL exception
+     * @throws PipelineJobCancelingException pipeline job canceled exception
      */
-    public void prepare(final MigrationJobItemContext jobItemContext) throws 
SQLException {
+    public void prepare(final MigrationJobItemContext jobItemContext) throws 
SQLException, PipelineJobCancelingException {
         
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
                 
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("Migration 
inventory dumper only support StandardPipelineDataSourceConfiguration"));
         DatabaseType sourceDatabaseType = 
jobItemContext.getJobConfig().getSourceDatabaseType();
         new 
DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
         if (jobItemContext.isStopping()) {
-            PipelineJobRegistry.stop(jobItemContext.getJobId());
-            return;
+            throw new PipelineJobCancelingException();
         }
         prepareAndCheckTargetWithLock(jobItemContext);
         if (jobItemContext.isStopping()) {
-            PipelineJobRegistry.stop(jobItemContext.getJobId());
-            return;
+            throw new PipelineJobCancelingException();
         }
         boolean isIncrementalSupported = 
DatabaseTypedSPILoader.findService(DialectIncrementalDumperCreator.class, 
sourceDatabaseType).isPresent();
         if (isIncrementalSupported) {
@@ -114,8 +113,7 @@ public final class MigrationJobPreparer {
         if (isIncrementalSupported) {
             initIncrementalTasks(jobItemContext);
             if (jobItemContext.isStopping()) {
-                PipelineJobRegistry.stop(jobItemContext.getJobId());
-                return;
+                throw new PipelineJobCancelingException();
             }
         }
         log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, 
incrementalTasks={}",

Reply via email to