This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 13f54689f6c Review and improve pipeline code (#28338)
13f54689f6c is described below

commit 13f54689f6cbdceef748d04992369fa63df7d8ea
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Sep 1 20:14:44 2023 +0800

    Review and improve pipeline code (#28338)
    
    * Extract CalculationContext
    
    * Extract IncrementalDumperConfigurationCreator
    
    * Extract TableInventoryChecker
    
    * Extract buildSourceTargetEntry for MigrationDistSQLStatementVisitor
    
    * Update consistency check map key
    
    * Refactor migration E2E to support customize consistency check algorithm 
props
---
 .../IncrementalDumperConfigurationCreator.java     | 35 +++++++++
 .../CRC32MatchTableDataConsistencyChecker.java     | 18 ++++-
 .../DataMatchTableDataConsistencyChecker.java      | 21 ++++-
 ...ker.java => MatchingTableInventoryChecker.java} |  8 +-
 .../table/TableDataConsistencyChecker.java         | 10 +--
 ...encyChecker.java => TableInventoryChecker.java} | 27 +------
 .../table/calculator/CalculationContext.java       | 91 ++++++++++++++++++++++
 .../RecordSingleTableInventoryCalculator.java      | 50 +-----------
 .../core/MigrationDistSQLStatementVisitor.java     | 17 ++--
 .../migration/api/impl/MigrationJobAPI.java        | 25 ++----
 .../MigrationDataConsistencyChecker.java           | 41 ++++++----
 ...ationIncrementalDumperConfigurationCreator.java | 59 ++++++++++++++
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  2 +-
 .../cases/migration/AbstractMigrationE2EIT.java    | 17 +++-
 .../general/MySQLMigrationGeneralE2EIT.java        | 13 ++--
 ...ava => FixtureTableDataConsistencyChecker.java} | 19 +----
 ...ture.java => FixtureTableInventoryChecker.java} | 25 +-----
 ...sistencycheck.table.TableDataConsistencyChecker |  2 +-
 18 files changed, 308 insertions(+), 172 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java
new file mode 100644
index 00000000000..2934a8f142f
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.config.ingest;
+
+import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+
+/**
+ * Incremental dumper configuration creator.
+ */
+public interface IncrementalDumperConfigurationCreator {
+    
+    /**
+     * Create dumper configuration.
+     *
+     * @param jobDataNodeLine job data node line
+     * @return dumper configuration
+     */
+    DumperConfiguration createDumperConfiguration(JobDataNodeLine 
jobDataNodeLine);
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
index 11e2479a393..42f7ff0824e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
@@ -31,11 +31,11 @@ import java.util.LinkedList;
  * CRC32 match table data consistency checker.
  */
 @SPIDescription("Match CRC32 of records.")
-public final class CRC32MatchTableDataConsistencyChecker extends 
MatchingTableDataConsistencyChecker {
+public final class CRC32MatchTableDataConsistencyChecker implements 
TableDataConsistencyChecker {
     
     @Override
-    protected SingleTableInventoryCalculator 
buildSingleTableInventoryCalculator() {
-        return new CRC32SingleTableInventoryCalculator();
+    public TableInventoryChecker buildTableInventoryChecker(final 
TableInventoryCheckParameter param) {
+        return new CRC32MatchTableInventoryChecker(param);
     }
     
     @Override
@@ -51,4 +51,16 @@ public final class CRC32MatchTableDataConsistencyChecker 
extends MatchingTableDa
     public String getType() {
         return "CRC32_MATCH";
     }
+    
+    private static final class CRC32MatchTableInventoryChecker extends 
MatchingTableInventoryChecker {
+        
+        CRC32MatchTableInventoryChecker(final TableInventoryCheckParameter 
param) {
+            super(param);
+        }
+        
+        @Override
+        protected SingleTableInventoryCalculator 
buildSingleTableInventoryCalculator() {
+            return new CRC32SingleTableInventoryCalculator();
+        }
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
index f40ff4609a5..9ae4cc1cfb6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
@@ -34,7 +34,7 @@ import java.util.Properties;
  */
 @SPIDescription("Match raw data of records.")
 @Slf4j
-public final class DataMatchTableDataConsistencyChecker extends 
MatchingTableDataConsistencyChecker {
+public final class DataMatchTableDataConsistencyChecker implements 
TableDataConsistencyChecker {
     
     private static final String CHUNK_SIZE_KEY = "chunk-size";
     
@@ -65,8 +65,8 @@ public final class DataMatchTableDataConsistencyChecker 
extends MatchingTableDat
     }
     
     @Override
-    protected SingleTableInventoryCalculator 
buildSingleTableInventoryCalculator() {
-        return new RecordSingleTableInventoryCalculator(chunkSize);
+    public TableInventoryChecker buildTableInventoryChecker(final 
TableInventoryCheckParameter param) {
+        return new DataMatchTableInventoryChecker(param, chunkSize);
     }
     
     @Override
@@ -78,4 +78,19 @@ public final class DataMatchTableDataConsistencyChecker 
extends MatchingTableDat
     public String getType() {
         return "DATA_MATCH";
     }
+    
+    private static final class DataMatchTableInventoryChecker extends 
MatchingTableInventoryChecker {
+        
+        private final int chunkSize;
+        
+        DataMatchTableInventoryChecker(final TableInventoryCheckParameter 
param, final int chunkSize) {
+            super(param);
+            this.chunkSize = chunkSize;
+        }
+        
+        @Override
+        protected SingleTableInventoryCalculator 
buildSingleTableInventoryCalculator() {
+            return new RecordSingleTableInventoryCalculator(chunkSize);
+        }
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
similarity index 98%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index 496e3a75174..d941dbb0d26 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -47,16 +47,18 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Matching table data consistency checker.
+ * Matching table inventory checker.
  */
 @Slf4j
 @RequiredArgsConstructor
-public abstract class MatchingTableDataConsistencyChecker implements 
TableDataConsistencyChecker {
+public abstract class MatchingTableInventoryChecker implements 
TableInventoryChecker {
+    
+    private final TableInventoryCheckParameter param;
     
     private final Set<SingleTableInventoryCalculator> calculators = new 
HashSet<>();
     
     @Override
-    public TableDataConsistencyCheckResult checkSingleTableInventoryData(final 
TableInventoryCheckParameter param) {
+    public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
         ThreadFactory threadFactory = 
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(param.getJobId()) + 
"-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         try {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
index 2a126301f25..2faa5cc0775 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
@@ -17,8 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
 
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineCancellable;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import org.apache.shardingsphere.infra.algorithm.ShardingSphereAlgorithm;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
@@ -27,15 +25,15 @@ import java.util.Collection;
 /**
  * Table data consistency checker.
  */
-public interface TableDataConsistencyChecker extends ShardingSphereAlgorithm, 
PipelineCancellable {
+public interface TableDataConsistencyChecker extends ShardingSphereAlgorithm {
     
     /**
-     * Data consistency check for single table inventory data.
+     * Build table inventory checker.
      *
      * @param param check parameter
-     * @return check result
+     * @return table inventory checker
      */
-    TableDataConsistencyCheckResult 
checkSingleTableInventoryData(TableInventoryCheckParameter param);
+    TableInventoryChecker 
buildTableInventoryChecker(TableInventoryCheckParameter param);
     
     /**
      * Is break on inventory check not matched.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryChecker.java
similarity index 63%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryChecker.java
index 2a126301f25..5248d0aa14a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryChecker.java
@@ -19,37 +19,16 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
 
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineCancellable;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import org.apache.shardingsphere.infra.algorithm.ShardingSphereAlgorithm;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-
-import java.util.Collection;
 
 /**
- * Table data consistency checker.
+ * Table inventory checker.
  */
-public interface TableDataConsistencyChecker extends ShardingSphereAlgorithm, 
PipelineCancellable {
+public interface TableInventoryChecker extends PipelineCancellable {
     
     /**
      * Data consistency check for single table inventory data.
      *
-     * @param param check parameter
      * @return check result
      */
-    TableDataConsistencyCheckResult 
checkSingleTableInventoryData(TableInventoryCheckParameter param);
-    
-    /**
-     * Is break on inventory check not matched.
-     *
-     * @return break or not
-     */
-    default boolean isBreakOnInventoryCheckNotMatched() {
-        return true;
-    }
-    
-    /**
-     * Get supported database types.
-     *
-     * @return supported database types
-     */
-    Collection<DatabaseType> getSupportedDatabaseTypes();
+    TableDataConsistencyCheckResult checkSingleTableInventoryData();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
new file mode 100644
index 00000000000..a1183457834
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Calculation context.
+ */
+@RequiredArgsConstructor
+public final class CalculationContext implements AutoCloseable {
+    
+    private final AtomicReference<Connection> connection = new 
AtomicReference<>();
+    
+    private final AtomicReference<PreparedStatement> preparedStatement = new 
AtomicReference<>();
+    
+    private final AtomicReference<ResultSet> resultSet = new 
AtomicReference<>();
+    
+    /**
+     * Get connection.
+     *
+     * @return connection
+     */
+    public Connection getConnection() {
+        return connection.get();
+    }
+    
+    /**
+     * Set connection.
+     *
+     * @param connection connection
+     */
+    public void setConnection(final Connection connection) {
+        this.connection.set(connection);
+    }
+    
+    /**
+     * Get result set.
+     *
+     * @return result set
+     */
+    public ResultSet getResultSet() {
+        return resultSet.get();
+    }
+    
+    /**
+     * Set prepared statement.
+     *
+     * @param preparedStatement prepared statement
+     */
+    public void setPreparedStatement(final PreparedStatement 
preparedStatement) {
+        this.preparedStatement.set(preparedStatement);
+    }
+    
+    /**
+     * Set result set.
+     *
+     * @param resultSet result set
+     */
+    public void setResultSet(final ResultSet resultSet) {
+        this.resultSet.set(resultSet);
+    }
+    
+    @Override
+    public void close() {
+        QuietlyCloser.close(resultSet.get());
+        QuietlyCloser.close(preparedStatement.get());
+        QuietlyCloser.close(connection.get());
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index abcbc0e2143..9892cdf5ea1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -17,7 +17,6 @@
 
 package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
 
-import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.query.JDBCStreamQueryBuilder;
@@ -43,7 +42,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Record single table inventory calculator.
@@ -109,7 +107,8 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
     
     private CalculationContext createCalculationContext(final 
SingleTableInventoryCalculateParameter param) throws SQLException {
         Connection connection = param.getDataSource().getConnection();
-        CalculationContext result = new CalculationContext(connection);
+        CalculationContext result = new CalculationContext();
+        result.setConnection(connection);
         param.setCalculationContext(result);
         return result;
     }
@@ -143,49 +142,4 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
             preparedStatement.setObject(1, tableCheckPosition);
         }
     }
-    
-    @RequiredArgsConstructor
-    private static final class CalculationContext implements AutoCloseable {
-        
-        @Getter
-        private final Connection connection;
-        
-        private final AtomicReference<PreparedStatement> preparedStatement = 
new AtomicReference<>();
-        
-        private final AtomicReference<ResultSet> resultSet = new 
AtomicReference<>();
-        
-        /**
-         * Get result set.
-         *
-         * @return result set
-         */
-        public ResultSet getResultSet() {
-            return resultSet.get();
-        }
-        
-        /**
-         * Set prepared statement.
-         *
-         * @param preparedStatement prepared statement
-         */
-        public void setPreparedStatement(final PreparedStatement 
preparedStatement) {
-            this.preparedStatement.set(preparedStatement);
-        }
-        
-        /**
-         * Set result set.
-         *
-         * @param resultSet result set
-         */
-        public void setResultSet(final ResultSet resultSet) {
-            this.resultSet.set(resultSet);
-        }
-        
-        @Override
-        public void close() {
-            QuietlyCloser.close(resultSet.get());
-            QuietlyCloser.close(preparedStatement.get());
-            QuietlyCloser.close(connection);
-        }
-    }
 }
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
 
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
index 839a5ee9f2d..cef053312eb 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
+++ 
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
@@ -36,11 +36,13 @@ import 
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatemen
 import 
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationListContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationSourceStorageUnitsContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationStatusContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.SourceTableNameContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationCheckContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationCheckContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StorageUnitDefinitionContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.TargetTableNameContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.UnregisterMigrationSourceStorageUnitContext;
 import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
 import org.apache.shardingsphere.distsql.parser.segment.DataSourceSegment;
@@ -82,16 +84,21 @@ public final class MigrationDistSQLStatementVisitor extends 
MigrationDistSQLStat
     
     @Override
     public ASTNode visitMigrateTable(final MigrateTableContext ctx) {
-        List<String> source = 
Splitter.on('.').splitToList(getRequiredIdentifierValue(ctx.sourceTableName()));
-        List<String> target = 
Splitter.on('.').splitToList(getRequiredIdentifierValue(ctx.targetTableName()));
+        SourceTargetEntry sourceTargetEntry = 
buildSourceTargetEntry(ctx.sourceTableName(), ctx.targetTableName());
+        return new 
MigrateTableStatement(Collections.singletonList(sourceTargetEntry), 
sourceTargetEntry.getTargetDatabaseName());
+    }
+    
+    private SourceTargetEntry buildSourceTargetEntry(final 
SourceTableNameContext sourceContext, final TargetTableNameContext 
targetContext) {
+        List<String> source = 
Splitter.on('.').splitToList(getRequiredIdentifierValue(sourceContext));
+        List<String> target = 
Splitter.on('.').splitToList(getRequiredIdentifierValue(targetContext));
         String sourceResourceName = source.get(0);
         String sourceSchemaName = 3 == source.size() ? source.get(1) : null;
         String sourceTableName = source.get(source.size() - 1);
         String targetDatabaseName = target.size() > 1 ? target.get(0) : null;
         String targetTableName = target.get(target.size() - 1);
-        SourceTargetEntry sourceTargetEntry = new 
SourceTargetEntry(targetDatabaseName, new DataNode(sourceResourceName, 
sourceTableName), targetTableName);
-        sourceTargetEntry.getSource().setSchemaName(sourceSchemaName);
-        return new 
MigrateTableStatement(Collections.singletonList(sourceTargetEntry), 
targetDatabaseName);
+        SourceTargetEntry result = new SourceTargetEntry(targetDatabaseName, 
new DataNode(sourceResourceName, sourceTableName), targetTableName);
+        result.getSource().setSchemaName(sourceSchemaName);
+        return result;
     }
     
     private String getRequiredIdentifierValue(final ParseTree context) {
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index b5b54d4f452..e6fb8ca124f 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -25,7 +25,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 import 
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
@@ -68,6 +67,7 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperConfigurationCreator;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
@@ -262,17 +262,13 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     @Override
     public MigrationTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
         MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) 
pipelineJobConfig;
-        JobDataNodeLine dataNodeLine = 
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
-        Map<ActualTableName, LogicTableName> tableNameMap = 
JobDataNodeLineConvertUtils.buildTableNameMap(dataNodeLine);
-        TableNameSchemaNameMapping tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
-        CreateTableConfiguration createTableConfig = 
buildCreateTableConfiguration(jobConfig, tableNameSchemaNameMapping);
-        String dataSourceName = 
dataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
-        DumperConfiguration dumperConfig = 
buildDumperConfiguration(jobConfig.getJobId(), dataSourceName, 
jobConfig.getSources().get(dataSourceName), tableNameMap, 
tableNameSchemaNameMapping);
+        DumperConfiguration dumperConfig = new 
MigrationIncrementalDumperConfigurationCreator(jobConfig).createDumperConfiguration(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
+        CreateTableConfiguration createTableConfig = 
buildCreateTableConfiguration(jobConfig, 
dumperConfig.getTableNameSchemaNameMapping());
         Set<LogicTableName> targetTableNames = 
jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
         Map<LogicTableName, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
                 ((ShardingSpherePipelineDataSourceConfiguration) 
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
-        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
shardingColumnsMap, tableNameSchemaNameMapping);
-        MigrationTaskConfiguration result = new 
MigrationTaskConfiguration(dataSourceName, createTableConfig, dumperConfig, 
importerConfig);
+        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
shardingColumnsMap, dumperConfig.getTableNameSchemaNameMapping());
+        MigrationTaskConfiguration result = new 
MigrationTaskConfiguration(dumperConfig.getDataSourceName(), createTableConfig, 
dumperConfig, importerConfig);
         log.info("buildTaskConfiguration, result={}", result);
         return result;
     }
@@ -296,17 +292,6 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
         return result;
     }
     
-    private DumperConfiguration buildDumperConfiguration(final String jobId, 
final String dataSourceName, final PipelineDataSourceConfiguration 
sourceDataSource,
-                                                         final 
Map<ActualTableName, LogicTableName> tableNameMap, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
-        DumperConfiguration result = new DumperConfiguration();
-        result.setJobId(jobId);
-        result.setDataSourceName(dataSourceName);
-        result.setDataSourceConfig(sourceDataSource);
-        result.setTableNameMap(tableNameMap);
-        result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
-        return result;
-    }
-    
     private ImporterConfiguration buildImporterConfiguration(final 
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig,
                                                              final 
Map<LogicTableName, Set<String>> shardingColumnsMap, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
         MigrationProcessContext processContext = new 
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index c01a6d404d7..f4fef5957a4 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -39,6 +39,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -46,14 +47,15 @@ import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorit
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 /**
  * Data consistency checker for migration job.
@@ -67,7 +69,7 @@ public final class MigrationDataConsistencyChecker implements 
PipelineDataConsis
     
     private final ConsistencyCheckJobItemProgressContext progressContext;
     
-    private final Set<TableDataConsistencyChecker> tableCheckers = new 
HashSet<>();
+    private final AtomicReference<TableInventoryChecker> 
currentTableInventoryChecker = new AtomicReference<>();
     
     public MigrationDataConsistencyChecker(final MigrationJobConfiguration 
jobConfig, final InventoryIncrementalProcessContext processContext,
                                            final 
ConsistencyCheckJobItemProgressContext progressContext) {
@@ -84,13 +86,14 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
         progressContext.setRecordsCount(getRecordsCount());
         progressContext.getTableNames().addAll(sourceTableNames);
         progressContext.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
-        Map<String, TableDataConsistencyCheckResult> result = new 
LinkedHashMap<>();
+        Map<DataNode, TableDataConsistencyCheckResult> result = new 
LinkedHashMap<>();
+        TableDataConsistencyChecker tableChecker = 
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps);
         try (PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager()) {
             for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
-                checkTableInventoryData(each, algorithmType, algorithmProps, 
result, dataSourceManager);
+                checkTableInventoryData(each, tableChecker, result, 
dataSourceManager);
             }
         }
-        return result;
+        return result.entrySet().stream().collect(Collectors.toMap(entry -> 
DataNodeUtils.formatWithSchema(entry.getKey()), Entry::getValue));
     }
     
     private long getRecordsCount() {
@@ -98,15 +101,12 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
         return 
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
     }
     
-    private void checkTableInventoryData(final JobDataNodeLine 
jobDataNodeLine, final String algorithmType, final Properties algorithmProps,
-                                         final Map<String, 
TableDataConsistencyCheckResult> checkResults, final PipelineDataSourceManager 
dataSourceManager) {
+    private void checkTableInventoryData(final JobDataNodeLine 
jobDataNodeLine, final TableDataConsistencyChecker tableChecker,
+                                         final Map<DataNode, 
TableDataConsistencyCheckResult> checkResultMap, final 
PipelineDataSourceManager dataSourceManager) {
         for (JobDataNodeEntry entry : jobDataNodeLine.getEntries()) {
             for (DataNode each : entry.getDataNodes()) {
-                TableDataConsistencyChecker tableChecker = 
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps);
-                tableCheckers.add(tableChecker);
                 TableDataConsistencyCheckResult checkResult = 
checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker, 
dataSourceManager);
-                checkResults.put(DataNodeUtils.formatWithSchema(each), 
checkResult);
-                tableCheckers.remove(null);
+                checkResultMap.put(each, checkResult);
                 if (!checkResult.isMatched() && 
tableChecker.isBreakOnInventoryCheckNotMatched()) {
                     log.info("Unmatched on table '{}', ignore left tables", 
DataNodeUtils.formatWithSchema(each));
                     return;
@@ -129,18 +129,27 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
                 sourceTable.getSchemaName().getOriginal(), 
sourceTable.getTableName().getOriginal(), metaDataLoader);
         TableInventoryCheckParameter param = new TableInventoryCheckParameter(
                 jobConfig.getJobId(), sourceDataSource, targetDataSource, 
sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm, 
progressContext);
-        return tableChecker.checkSingleTableInventoryData(param);
+        TableInventoryChecker tableInventoryChecker = 
tableChecker.buildTableInventoryChecker(param);
+        currentTableInventoryChecker.set(tableInventoryChecker);
+        TableDataConsistencyCheckResult result = 
tableInventoryChecker.checkSingleTableInventoryData();
+        currentTableInventoryChecker.set(null);
+        return result;
     }
     
     @Override
     public void cancel() {
-        for (TableDataConsistencyChecker each : tableCheckers) {
-            each.cancel();
+        TableInventoryChecker checker = currentTableInventoryChecker.get();
+        if (null != checker) {
+            checker.cancel();
         }
     }
     
     @Override
     public boolean isCanceling() {
-        return 
tableCheckers.stream().anyMatch(TableDataConsistencyChecker::isCanceling);
+        TableInventoryChecker checker = currentTableInventoryChecker.get();
+        if (null == checker) {
+            return false;
+        }
+        return checker.isCanceling();
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java
new file mode 100644
index 00000000000..4763f25aa04
--- /dev/null
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest;
+
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import 
org.apache.shardingsphere.data.pipeline.common.config.ingest.IncrementalDumperConfigurationCreator;
+import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
+
+import java.util.Map;
+
+/**
+ * Migration incremental dumper configuration creator.
+ */
+@RequiredArgsConstructor
+public final class MigrationIncrementalDumperConfigurationCreator implements 
IncrementalDumperConfigurationCreator {
+    
+    private final MigrationJobConfiguration jobConfig;
+    
+    @Override
+    public DumperConfiguration createDumperConfiguration(final JobDataNodeLine 
jobDataNodeLine) {
+        Map<ActualTableName, LogicTableName> tableNameMap = 
JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine);
+        TableNameSchemaNameMapping tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
+        String dataSourceName = 
jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
+        return buildDumperConfiguration(jobConfig.getJobId(), dataSourceName, 
jobConfig.getSources().get(dataSourceName), tableNameMap, 
tableNameSchemaNameMapping);
+    }
+    
+    private DumperConfiguration buildDumperConfiguration(final String jobId, 
final String dataSourceName, final PipelineDataSourceConfiguration 
sourceDataSource,
+                                                         final 
Map<ActualTableName, LogicTableName> tableNameMap, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+        DumperConfiguration result = new DumperConfiguration();
+        result.setJobId(jobId);
+        result.setDataSourceName(dataSourceName);
+        result.setDataSourceConfig(sourceDataSource);
+        result.setTableNameMap(tableNameMap);
+        result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
+        return result;
+    }
+}
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 52aa0d515a6..ecf7e569595 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -205,7 +205,7 @@ class CDCE2EIT {
         TableInventoryCheckParameter param = new 
TableInventoryCheckParameter("", sourceDataSource, targetDataSource, 
schemaTableName, schemaTableName,
                 tableMetaData.getColumnNames(), uniqueKeys, null, 
progressContext);
         TableDataConsistencyChecker tableChecker = 
TypedSPILoader.getService(TableDataConsistencyChecker.class, "DATA_MATCH", new 
Properties());
-        TableDataConsistencyCheckResult checkResult = 
tableChecker.checkSingleTableInventoryData(param);
+        TableDataConsistencyCheckResult checkResult = 
tableChecker.buildTableInventoryChecker(param).checkSingleTableInventoryData();
         assertTrue(checkResult.isMatched());
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 0819ab249d2..ffd99fd94c6 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -150,7 +151,11 @@ public abstract class AbstractMigrationE2EIT {
     }
     
     protected void assertCheckMigrationSuccess(final PipelineContainerComposer 
containerComposer, final String jobId, final String algorithmType) throws 
SQLException {
-        containerComposer.proxyExecuteWithLog(String.format("CHECK MIGRATION 
'%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
+        assertCheckMigrationSuccess(containerComposer, jobId, algorithmType, 
new Properties());
+    }
+    
+    protected void assertCheckMigrationSuccess(final PipelineContainerComposer 
containerComposer, final String jobId, final String algorithmType, final 
Properties algorithmProps) throws SQLException {
+        
containerComposer.proxyExecuteWithLog(buildConsistencyCheckDistSQL(jobId, 
algorithmType, algorithmProps), 0);
         // TODO Need to add after the stop then to start, can continue the 
consistency check from the previous progress
         List<Map<String, Object>> resultList = Collections.emptyList();
         for (int i = 0; i < 30; i++) {
@@ -174,4 +179,14 @@ public abstract class AbstractMigrationE2EIT {
             assertThat("finished_percentage is not 100", 
each.get("finished_percentage").toString(), is("100"));
         }
     }
+    
+    private String buildConsistencyCheckDistSQL(final String jobId, final 
String algorithmType, final Properties algorithmProps) {
+        if (null == algorithmProps || algorithmProps.isEmpty()) {
+            return String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", 
jobId, algorithmType);
+        }
+        String sql = "CHECK MIGRATION '%s' BY TYPE (NAME='%s', PROPERTIES("
+                + algorithmProps.entrySet().stream().map(entry -> 
String.format("'%s'='%s'", entry.getKey(), 
entry.getValue())).collect(Collectors.joining(","))
+                + "))";
+        return String.format(sql, jobId, algorithmType);
+    }
 }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 37f9b4c6294..740f7c52395 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -43,6 +43,7 @@ import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -87,11 +88,13 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             startMigrationByJobId(containerComposer, orderJobId);
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order", 10000);
-            assertMigrationSuccessById(containerComposer, orderJobId, 
"DATA_MATCH");
+            Properties algorithmProps = new Properties();
+            algorithmProps.setProperty("chunk-size", "300");
+            assertMigrationSuccessById(containerComposer, orderJobId, 
"DATA_MATCH", algorithmProps);
             String orderItemJobId = getJobIdByTableName(containerComposer, 
"ds_0.t_order_item");
-            assertMigrationSuccessById(containerComposer, orderItemJobId, 
"DATA_MATCH");
+            assertMigrationSuccessById(containerComposer, orderItemJobId, 
"DATA_MATCH", algorithmProps);
             Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> 
true);
-            assertMigrationSuccessById(containerComposer, orderItemJobId, 
"CRC32_MATCH");
+            assertMigrationSuccessById(containerComposer, orderItemJobId, 
"CRC32_MATCH", new Properties());
             for (String each : listJobId(containerComposer)) {
                 commitMigrationByJobId(containerComposer, each);
             }
@@ -101,13 +104,13 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
         }
     }
     
-    private void assertMigrationSuccessById(final PipelineContainerComposer 
containerComposer, final String jobId, final String algorithmType) throws 
SQLException {
+    private void assertMigrationSuccessById(final PipelineContainerComposer 
containerComposer, final String jobId, final String algorithmType, final 
Properties algorithmProps) throws SQLException {
         List<Map<String, Object>> jobStatus = 
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
         for (Map<String, Object> each : jobStatus) {
             
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 
0);
             
assertThat(Integer.parseInt(each.get("inventory_finished_percentage").toString()),
 is(100));
         }
-        assertCheckMigrationSuccess(containerComposer, jobId, algorithmType);
+        assertCheckMigrationSuccess(containerComposer, jobId, algorithmType, 
algorithmProps);
     }
     
     private static boolean isEnabled() {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableDataConsistencyChecker.java
similarity index 70%
copy from 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
copy to 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableDataConsistencyChecker.java
index b0270f10613..a3dd3e28c4a 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableDataConsistencyChecker.java
@@ -17,11 +17,9 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
 
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
@@ -29,20 +27,11 @@ import 
org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
 import java.util.Collection;
 
 @SPIDescription("Fixture description.")
-public final class TableDataConsistencyCheckerFixture implements 
TableDataConsistencyChecker {
+public final class FixtureTableDataConsistencyChecker implements 
TableDataConsistencyChecker {
     
     @Override
-    public TableDataConsistencyCheckResult checkSingleTableInventoryData(final 
TableInventoryCheckParameter param) {
-        return new TableDataConsistencyCheckResult(new 
TableDataConsistencyCountCheckResult(2, 2), new 
TableDataConsistencyContentCheckResult(true));
-    }
-    
-    @Override
-    public void cancel() {
-    }
-    
-    @Override
-    public boolean isCanceling() {
-        return false;
+    public TableInventoryChecker buildTableInventoryChecker(final 
TableInventoryCheckParameter param) {
+        return new FixtureTableInventoryChecker();
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableInventoryChecker.java
similarity index 66%
rename from 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
rename to 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableInventoryChecker.java
index b0270f10613..0a9e1841220 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableInventoryChecker.java
@@ -20,21 +20,9 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
 
-import java.util.Collection;
-
-@SPIDescription("Fixture description.")
-public final class TableDataConsistencyCheckerFixture implements 
TableDataConsistencyChecker {
-    
-    @Override
-    public TableDataConsistencyCheckResult checkSingleTableInventoryData(final 
TableInventoryCheckParameter param) {
-        return new TableDataConsistencyCheckResult(new 
TableDataConsistencyCountCheckResult(2, 2), new 
TableDataConsistencyContentCheckResult(true));
-    }
+public final class FixtureTableInventoryChecker implements 
TableInventoryChecker {
     
     @Override
     public void cancel() {
@@ -46,12 +34,7 @@ public final class TableDataConsistencyCheckerFixture 
implements TableDataConsis
     }
     
     @Override
-    public Collection<DatabaseType> getSupportedDatabaseTypes() {
-        return 
ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class);
-    }
-    
-    @Override
-    public String getType() {
-        return "FIXTURE";
+    public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
+        return new TableDataConsistencyCheckResult(new 
TableDataConsistencyCountCheckResult(2, 2), new 
TableDataConsistencyContentCheckResult(true));
     }
 }
diff --git 
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker
 
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker
index f6f49dec753..b771deadee2 100644
--- 
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker
+++ 
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.test.it.data.pipeline.core.fixture.TableDataConsistencyCheckerFixture
+org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureTableDataConsistencyChecker

Reply via email to