This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 628ba8dfb2e Refactor pipeline job cancel and exception handling
(#30335)
628ba8dfb2e is described below
commit 628ba8dfb2e172d68e43dd53363c10edba40b0d4
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Feb 28 15:42:55 2024 +0800
Refactor pipeline job cancel and exception handling (#30335)
* Add PipelineJobCanceledException; Refactor cancel()
* Throw PipelineJobCanceledException when job is stopping
* Rename PipelineJobCanceledException to PipelineJobCancelingException
* Refactor job exception handling and resources release
* Clean InventoryTaskExecuteCallback.onFailure
---
.../table/MatchingTableInventoryChecker.java | 30 ++++++++++--------
.../RecordSingleTableInventoryCalculator.java | 6 ++--
.../exception/PipelineJobCancelingException.java | 33 +++++++++++++++++++
.../core/job/AbstractSeparablePipelineJob.java | 30 ++++++++----------
.../core/task/runner/TransmissionTasksRunner.java | 37 +++++-----------------
.../MigrationDataConsistencyChecker.java | 18 +++++------
.../migration/preparer/MigrationJobPreparer.java | 14 ++++----
7 files changed, 90 insertions(+), 78 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index 5e67b92a157..6e799b958da 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -19,30 +19,30 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.sql.SQLException;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
-import java.util.Set;
+import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Matching table inventory checker.
@@ -53,7 +53,11 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
private final TableInventoryCheckParameter param;
- private final Set<SingleTableInventoryCalculator> calculators = new
HashSet<>();
+ private final AtomicBoolean canceling = new AtomicBoolean(false);
+
+ private volatile SingleTableInventoryCalculator sourceCalculator;
+
+ private volatile SingleTableInventoryCalculator targetCalculator;
@Override
public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
@@ -73,9 +77,9 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
SingleTableInventoryCalculateParameter targetParam = new
SingleTableInventoryCalculateParameter(param.getTargetDataSource(),
param.getTargetTable(),
param.getColumnNames(), param.getUniqueKeys(),
param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().toString()));
SingleTableInventoryCalculator sourceCalculator =
buildSingleTableInventoryCalculator();
- calculators.add(sourceCalculator);
+ this.sourceCalculator = sourceCalculator;
SingleTableInventoryCalculator targetCalculator =
buildSingleTableInventoryCalculator();
- calculators.add(targetCalculator);
+ this.targetCalculator = targetCalculator;
try {
Iterator<SingleTableInventoryCalculatedResult>
sourceCalculatedResults = waitFuture(executor.submit(() ->
sourceCalculator.calculate(sourceParam))).iterator();
Iterator<SingleTableInventoryCalculatedResult>
targetCalculatedResults = waitFuture(executor.submit(() ->
targetCalculator.calculate(targetParam))).iterator();
@@ -83,8 +87,8 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
} finally {
QuietlyCloser.close(sourceParam.getCalculationContext());
QuietlyCloser.close(targetParam.getCalculationContext());
- calculators.remove(sourceCalculator);
- calculators.remove(targetCalculator);
+ this.sourceCalculator = null;
+ this.targetCalculator = null;
}
}
@@ -145,13 +149,13 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
@Override
public void cancel() {
- for (SingleTableInventoryCalculator each : calculators) {
- each.cancel();
- }
+ canceling.set(true);
+
Optional.ofNullable(sourceCalculator).ifPresent(SingleTableInventoryCalculator::cancel);
+
Optional.ofNullable(targetCalculator).ifPresent(SingleTableInventoryCalculator::cancel);
}
@Override
public boolean isCanceling() {
- return
calculators.stream().anyMatch(SingleTableInventoryCalculator::isCanceling);
+ return canceling.get();
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index 2e8519686ec..9efea4fc839 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.ColumnValueReaderEngine;
import
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
@@ -60,7 +61,8 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
ResultSet resultSet = calculationContext.getResultSet();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
while (resultSet.next()) {
- ShardingSpherePreconditions.checkState(!isCanceling(), () ->
new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName()));
+ ShardingSpherePreconditions.checkState(!isCanceling(), () ->
new PipelineJobCancelingException(
+ "Calculate chunk canceled, schema name: %s, table
name: %s", param.getSchemaName(), param.getLogicTableName()));
Map<String, Object> columnRecord = new LinkedHashMap<>();
for (int columnIndex = 1, columnCount =
resultSetMetaData.getColumnCount(); columnIndex <= columnCount; columnIndex++) {
columnRecord.put(resultSetMetaData.getColumnLabel(columnIndex),
columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex));
@@ -75,7 +77,7 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
calculationContext.close();
}
return records.isEmpty() ? Optional.empty() : Optional.of(new
RecordSingleTableInventoryCalculatedResult(maxUniqueKeyValue, records));
- } catch (final PipelineSQLException ex) {
+ } catch (final PipelineSQLException | PipelineJobCancelingException
ex) {
calculationContext.close();
throw ex;
// CHECKSTYLE:OFF
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineJobCancelingException.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineJobCancelingException.java
new file mode 100644
index 00000000000..2fbc7daefbc
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineJobCancelingException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.exception;
+
+import lombok.NoArgsConstructor;
+
+/**
+ * Pipeline job canceling exception.
+ */
+@NoArgsConstructor
+public final class PipelineJobCancelingException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public PipelineJobCancelingException(final String errorMessage, final
Object... args) {
+ super(String.format(errorMessage, args));
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 8d12a7f000b..855bd8ed717 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -22,7 +22,6 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
@@ -32,7 +31,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.Pipeline
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -86,27 +84,36 @@ public abstract class AbstractSeparablePipelineJob<T
extends PipelineJobConfigur
T jobConfig = jobConfigManager.getJobConfiguration(jobId);
PipelineJobItemManager<P> jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
P jobItemProgress =
jobItemManager.getProgress(shardingContext.getJobName(),
shardingItem).orElse(null);
+ boolean started = false;
try {
- execute(buildJobItemContext(jobConfig, shardingItem,
jobItemProgress, jobProcessContext));
+ started = execute(buildJobItemContext(jobConfig, shardingItem,
jobItemProgress, jobProcessContext));
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
- processFailed(jobId, shardingItem, ex);
- throw ex;
+ if (!jobRunnerManager.isStopping()) {
+ log.error("Job execution failed, {}-{}", jobId, shardingItem,
ex);
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
+ throw ex;
+ }
+ } finally {
+ if (started) {
+
jobRunnerManager.getTasksRunner(shardingItem).ifPresent(PipelineTasksRunner::stop);
+ }
}
}
- private void execute(final I jobItemContext) {
+ private boolean execute(final I jobItemContext) {
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner = buildTasksRunner(jobItemContext);
if (!jobRunnerManager.addTasksRunner(shardingItem, tasksRunner)) {
- return;
+ return false;
}
String jobId = jobItemContext.getJobId();
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
prepare(jobItemContext);
log.info("Start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
tasksRunner.start();
+ return true;
}
protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P
jobItemProgress, TransmissionProcessContext jobProcessContext);
@@ -124,13 +131,4 @@ public abstract class AbstractSeparablePipelineJob<T
extends PipelineJobConfigur
}
protected abstract void doPrepare(I jobItemContext) throws SQLException;
-
- private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
- log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
- try {
- new
PipelineJobManager(PipelineJobIdUtils.parseJobType(jobId)).stop(jobId);
- } catch (final PipelineJobNotFoundException ignored) {
- }
- }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
index d7e341f78f8..d869bd2a2ef 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
@@ -21,19 +21,18 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
@@ -87,7 +86,7 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
@Override
public void start() {
if (jobItemContext.isStopping()) {
- return;
+ throw new PipelineJobCancelingException();
}
new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
@@ -118,8 +117,7 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
private synchronized void executeIncrementalTask() {
if (jobItemContext.isStopping()) {
- log.info("Stopping is true, ignore incremental task");
- return;
+ throw new PipelineJobCancelingException();
}
if (incrementalTasks.isEmpty()) {
log.info("incrementalTasks empty, ignore");
@@ -150,30 +148,18 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
}
}
- protected void inventoryFailureCallback(final Throwable throwable) {
- log.error("onFailure, inventory task execute failed.", throwable);
- String jobId = jobItemContext.getJobId();
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
jobItemContext.getShardingItem(), throwable);
- try {
- jobManager.stop(jobId);
- } catch (final PipelineJobNotFoundException ignored) {
- }
- }
-
private final class InventoryTaskExecuteCallback implements
ExecuteCallback {
@Override
public void onSuccess() {
if (jobItemContext.isStopping()) {
- log.info("Inventory task onSuccess, stopping true, ignore");
- return;
+ throw new PipelineJobCancelingException();
}
inventorySuccessCallback();
}
@Override
- public void onFailure(final Throwable throwable) {
- inventoryFailureCallback(throwable);
+ public void onFailure(final Throwable ignored) {
}
}
@@ -185,14 +171,7 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
}
@Override
- public void onFailure(final Throwable throwable) {
- log.error("onFailure, incremental task execute failed.",
throwable);
- String jobId = jobItemContext.getJobId();
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
jobItemContext.getShardingItem(), throwable);
- try {
- jobManager.stop(jobId);
- } catch (final PipelineJobNotFoundException ignored) {
- }
+ public void onFailure(final Throwable ignored) {
}
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 7371a1e7b39..2d9a57595fd 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -35,7 +35,6 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTable
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
@@ -46,6 +45,7 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -53,7 +53,9 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -71,6 +73,8 @@ public final class MigrationDataConsistencyChecker implements
PipelineDataConsis
private final AtomicReference<TableInventoryChecker>
currentTableInventoryChecker = new AtomicReference<>();
+ private final AtomicBoolean canceling = new AtomicBoolean(false);
+
public MigrationDataConsistencyChecker(final MigrationJobConfiguration
jobConfig, final TransmissionProcessContext processContext,
final
ConsistencyCheckJobItemProgressContext progressContext) {
this.jobConfig = jobConfig;
@@ -139,18 +143,12 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
@Override
public void cancel() {
- TableInventoryChecker checker = currentTableInventoryChecker.get();
- if (null != checker) {
- checker.cancel();
- }
+ canceling.set(true);
+
Optional.ofNullable(currentTableInventoryChecker.get()).ifPresent(TableInventoryChecker::cancel);
}
@Override
public boolean isCanceling() {
- TableInventoryChecker checker = currentTableInventoryChecker.get();
- if (null == checker) {
- return false;
- }
- return checker.isCanceling();
+ return canceling.get();
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 67a864b71b5..817d64dc03d 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngin
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
@@ -35,7 +36,6 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.In
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
@@ -90,21 +90,20 @@ public final class MigrationJobPreparer {
*
* @param jobItemContext job item context
* @throws SQLException SQL exception
+ * @throws PipelineJobCancelingException pipeline job canceled exception
*/
- public void prepare(final MigrationJobItemContext jobItemContext) throws
SQLException {
+ public void prepare(final MigrationJobItemContext jobItemContext) throws
SQLException, PipelineJobCancelingException {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("Migration
inventory dumper only support StandardPipelineDataSourceConfiguration"));
DatabaseType sourceDatabaseType =
jobItemContext.getJobConfig().getSourceDatabaseType();
new
DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
- PipelineJobRegistry.stop(jobItemContext.getJobId());
- return;
+ throw new PipelineJobCancelingException();
}
prepareAndCheckTargetWithLock(jobItemContext);
if (jobItemContext.isStopping()) {
- PipelineJobRegistry.stop(jobItemContext.getJobId());
- return;
+ throw new PipelineJobCancelingException();
}
boolean isIncrementalSupported =
DatabaseTypedSPILoader.findService(DialectIncrementalDumperCreator.class,
sourceDatabaseType).isPresent();
if (isIncrementalSupported) {
@@ -114,8 +113,7 @@ public final class MigrationJobPreparer {
if (isIncrementalSupported) {
initIncrementalTasks(jobItemContext);
if (jobItemContext.isStopping()) {
- PipelineJobRegistry.stop(jobItemContext.getJobId());
- return;
+ throw new PipelineJobCancelingException();
}
}
log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={},
incrementalTasks={}",