This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 13f54689f6c Review and improve pipeline code (#28338)
13f54689f6c is described below
commit 13f54689f6cbdceef748d04992369fa63df7d8ea
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Sep 1 20:14:44 2023 +0800
Review and improve pipeline code (#28338)
* Extract CalculationContext
* Extract IncrementalDumperConfigurationCreator
* Extract TableInventoryChecker
* Extract buildSourceTargetEntry for MigrationDistSQLStatementVisitor
* Update consistency check map key
* Refactor migration E2E to support customize consistency check algorithm
props
---
.../IncrementalDumperConfigurationCreator.java | 35 +++++++++
.../CRC32MatchTableDataConsistencyChecker.java | 18 ++++-
.../DataMatchTableDataConsistencyChecker.java | 21 ++++-
...ker.java => MatchingTableInventoryChecker.java} | 8 +-
.../table/TableDataConsistencyChecker.java | 10 +--
...encyChecker.java => TableInventoryChecker.java} | 27 +------
.../table/calculator/CalculationContext.java | 91 ++++++++++++++++++++++
.../RecordSingleTableInventoryCalculator.java | 50 +-----------
.../core/MigrationDistSQLStatementVisitor.java | 17 ++--
.../migration/api/impl/MigrationJobAPI.java | 25 ++----
.../MigrationDataConsistencyChecker.java | 41 ++++++----
...ationIncrementalDumperConfigurationCreator.java | 59 ++++++++++++++
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 2 +-
.../cases/migration/AbstractMigrationE2EIT.java | 17 +++-
.../general/MySQLMigrationGeneralE2EIT.java | 13 ++--
...ava => FixtureTableDataConsistencyChecker.java} | 19 +----
...ture.java => FixtureTableInventoryChecker.java} | 25 +-----
...sistencycheck.table.TableDataConsistencyChecker | 2 +-
18 files changed, 308 insertions(+), 172 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java
new file mode 100644
index 00000000000..2934a8f142f
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.config.ingest;
+
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+
+/**
+ * Incremental dumper configuration creator.
+ */
+public interface IncrementalDumperConfigurationCreator {
+
+ /**
+ * Create dumper configuration.
+ *
+ * @param jobDataNodeLine job data node line
+ * @return dumper configuration
+ */
+ DumperConfiguration createDumperConfiguration(JobDataNodeLine
jobDataNodeLine);
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
index 11e2479a393..42f7ff0824e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
@@ -31,11 +31,11 @@ import java.util.LinkedList;
* CRC32 match table data consistency checker.
*/
@SPIDescription("Match CRC32 of records.")
-public final class CRC32MatchTableDataConsistencyChecker extends
MatchingTableDataConsistencyChecker {
+public final class CRC32MatchTableDataConsistencyChecker implements
TableDataConsistencyChecker {
@Override
- protected SingleTableInventoryCalculator
buildSingleTableInventoryCalculator() {
- return new CRC32SingleTableInventoryCalculator();
+ public TableInventoryChecker buildTableInventoryChecker(final
TableInventoryCheckParameter param) {
+ return new CRC32MatchTableInventoryChecker(param);
}
@Override
@@ -51,4 +51,16 @@ public final class CRC32MatchTableDataConsistencyChecker
extends MatchingTableDa
public String getType() {
return "CRC32_MATCH";
}
+
+ private static final class CRC32MatchTableInventoryChecker extends
MatchingTableInventoryChecker {
+
+ CRC32MatchTableInventoryChecker(final TableInventoryCheckParameter
param) {
+ super(param);
+ }
+
+ @Override
+ protected SingleTableInventoryCalculator
buildSingleTableInventoryCalculator() {
+ return new CRC32SingleTableInventoryCalculator();
+ }
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
index f40ff4609a5..9ae4cc1cfb6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
@@ -34,7 +34,7 @@ import java.util.Properties;
*/
@SPIDescription("Match raw data of records.")
@Slf4j
-public final class DataMatchTableDataConsistencyChecker extends
MatchingTableDataConsistencyChecker {
+public final class DataMatchTableDataConsistencyChecker implements
TableDataConsistencyChecker {
private static final String CHUNK_SIZE_KEY = "chunk-size";
@@ -65,8 +65,8 @@ public final class DataMatchTableDataConsistencyChecker
extends MatchingTableDat
}
@Override
- protected SingleTableInventoryCalculator
buildSingleTableInventoryCalculator() {
- return new RecordSingleTableInventoryCalculator(chunkSize);
+ public TableInventoryChecker buildTableInventoryChecker(final
TableInventoryCheckParameter param) {
+ return new DataMatchTableInventoryChecker(param, chunkSize);
}
@Override
@@ -78,4 +78,19 @@ public final class DataMatchTableDataConsistencyChecker
extends MatchingTableDat
public String getType() {
return "DATA_MATCH";
}
+
+ private static final class DataMatchTableInventoryChecker extends
MatchingTableInventoryChecker {
+
+ private final int chunkSize;
+
+ DataMatchTableInventoryChecker(final TableInventoryCheckParameter
param, final int chunkSize) {
+ super(param);
+ this.chunkSize = chunkSize;
+ }
+
+ @Override
+ protected SingleTableInventoryCalculator
buildSingleTableInventoryCalculator() {
+ return new RecordSingleTableInventoryCalculator(chunkSize);
+ }
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
similarity index 98%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index 496e3a75174..d941dbb0d26 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -47,16 +47,18 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
- * Matching table data consistency checker.
+ * Matching table inventory checker.
*/
@Slf4j
@RequiredArgsConstructor
-public abstract class MatchingTableDataConsistencyChecker implements
TableDataConsistencyChecker {
+public abstract class MatchingTableInventoryChecker implements
TableInventoryChecker {
+
+ private final TableInventoryCheckParameter param;
private final Set<SingleTableInventoryCalculator> calculators = new
HashSet<>();
@Override
- public TableDataConsistencyCheckResult checkSingleTableInventoryData(final
TableInventoryCheckParameter param) {
+ public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
ThreadFactory threadFactory =
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(param.getJobId()) +
"-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
try {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
index 2a126301f25..2faa5cc0775 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
@@ -17,8 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineCancellable;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.infra.algorithm.ShardingSphereAlgorithm;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -27,15 +25,15 @@ import java.util.Collection;
/**
* Table data consistency checker.
*/
-public interface TableDataConsistencyChecker extends ShardingSphereAlgorithm,
PipelineCancellable {
+public interface TableDataConsistencyChecker extends ShardingSphereAlgorithm {
/**
- * Data consistency check for single table inventory data.
+ * Build table inventory checker.
*
* @param param check parameter
- * @return check result
+ * @return table inventory checker
*/
- TableDataConsistencyCheckResult
checkSingleTableInventoryData(TableInventoryCheckParameter param);
+ TableInventoryChecker
buildTableInventoryChecker(TableInventoryCheckParameter param);
/**
* Is break on inventory check not matched.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryChecker.java
similarity index 63%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryChecker.java
index 2a126301f25..5248d0aa14a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryChecker.java
@@ -19,37 +19,16 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineCancellable;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import org.apache.shardingsphere.infra.algorithm.ShardingSphereAlgorithm;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-
-import java.util.Collection;
/**
- * Table data consistency checker.
+ * Table inventory checker.
*/
-public interface TableDataConsistencyChecker extends ShardingSphereAlgorithm,
PipelineCancellable {
+public interface TableInventoryChecker extends PipelineCancellable {
/**
* Data consistency check for single table inventory data.
*
- * @param param check parameter
* @return check result
*/
- TableDataConsistencyCheckResult
checkSingleTableInventoryData(TableInventoryCheckParameter param);
-
- /**
- * Is break on inventory check not matched.
- *
- * @return break or not
- */
- default boolean isBreakOnInventoryCheckNotMatched() {
- return true;
- }
-
- /**
- * Get supported database types.
- *
- * @return supported database types
- */
- Collection<DatabaseType> getSupportedDatabaseTypes();
+ TableDataConsistencyCheckResult checkSingleTableInventoryData();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
new file mode 100644
index 00000000000..a1183457834
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Calculation context.
+ */
+@RequiredArgsConstructor
+public final class CalculationContext implements AutoCloseable {
+
+ private final AtomicReference<Connection> connection = new
AtomicReference<>();
+
+ private final AtomicReference<PreparedStatement> preparedStatement = new
AtomicReference<>();
+
+ private final AtomicReference<ResultSet> resultSet = new
AtomicReference<>();
+
+ /**
+ * Get connection.
+ *
+ * @return connection
+ */
+ public Connection getConnection() {
+ return connection.get();
+ }
+
+ /**
+ * Set connection.
+ *
+ * @param connection connection
+ */
+ public void setConnection(final Connection connection) {
+ this.connection.set(connection);
+ }
+
+ /**
+ * Get result set.
+ *
+ * @return result set
+ */
+ public ResultSet getResultSet() {
+ return resultSet.get();
+ }
+
+ /**
+ * Set prepared statement.
+ *
+ * @param preparedStatement prepared statement
+ */
+ public void setPreparedStatement(final PreparedStatement
preparedStatement) {
+ this.preparedStatement.set(preparedStatement);
+ }
+
+ /**
+ * Set result set.
+ *
+ * @param resultSet result set
+ */
+ public void setResultSet(final ResultSet resultSet) {
+ this.resultSet.set(resultSet);
+ }
+
+ @Override
+ public void close() {
+ QuietlyCloser.close(resultSet.get());
+ QuietlyCloser.close(preparedStatement.get());
+ QuietlyCloser.close(connection.get());
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index abcbc0e2143..9892cdf5ea1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -17,7 +17,6 @@
package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
-import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.query.JDBCStreamQueryBuilder;
@@ -43,7 +42,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicReference;
/**
* Record single table inventory calculator.
@@ -109,7 +107,8 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
private CalculationContext createCalculationContext(final
SingleTableInventoryCalculateParameter param) throws SQLException {
Connection connection = param.getDataSource().getConnection();
- CalculationContext result = new CalculationContext(connection);
+ CalculationContext result = new CalculationContext();
+ result.setConnection(connection);
param.setCalculationContext(result);
return result;
}
@@ -143,49 +142,4 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
preparedStatement.setObject(1, tableCheckPosition);
}
}
-
- @RequiredArgsConstructor
- private static final class CalculationContext implements AutoCloseable {
-
- @Getter
- private final Connection connection;
-
- private final AtomicReference<PreparedStatement> preparedStatement =
new AtomicReference<>();
-
- private final AtomicReference<ResultSet> resultSet = new
AtomicReference<>();
-
- /**
- * Get result set.
- *
- * @return result set
- */
- public ResultSet getResultSet() {
- return resultSet.get();
- }
-
- /**
- * Set prepared statement.
- *
- * @param preparedStatement prepared statement
- */
- public void setPreparedStatement(final PreparedStatement
preparedStatement) {
- this.preparedStatement.set(preparedStatement);
- }
-
- /**
- * Set result set.
- *
- * @param resultSet result set
- */
- public void setResultSet(final ResultSet resultSet) {
- this.resultSet.set(resultSet);
- }
-
- @Override
- public void close() {
- QuietlyCloser.close(resultSet.get());
- QuietlyCloser.close(preparedStatement.get());
- QuietlyCloser.close(connection);
- }
- }
}
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
index 839a5ee9f2d..cef053312eb 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
+++
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
@@ -36,11 +36,13 @@ import
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatemen
import
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationListContext;
import
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationSourceStorageUnitsContext;
import
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationStatusContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.SourceTableNameContext;
import
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationCheckContext;
import
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationContext;
import
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationCheckContext;
import
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationContext;
import
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StorageUnitDefinitionContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.TargetTableNameContext;
import
org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.UnregisterMigrationSourceStorageUnitContext;
import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
import org.apache.shardingsphere.distsql.parser.segment.DataSourceSegment;
@@ -82,16 +84,21 @@ public final class MigrationDistSQLStatementVisitor extends
MigrationDistSQLStat
@Override
public ASTNode visitMigrateTable(final MigrateTableContext ctx) {
- List<String> source =
Splitter.on('.').splitToList(getRequiredIdentifierValue(ctx.sourceTableName()));
- List<String> target =
Splitter.on('.').splitToList(getRequiredIdentifierValue(ctx.targetTableName()));
+ SourceTargetEntry sourceTargetEntry =
buildSourceTargetEntry(ctx.sourceTableName(), ctx.targetTableName());
+ return new
MigrateTableStatement(Collections.singletonList(sourceTargetEntry),
sourceTargetEntry.getTargetDatabaseName());
+ }
+
+ private SourceTargetEntry buildSourceTargetEntry(final
SourceTableNameContext sourceContext, final TargetTableNameContext
targetContext) {
+ List<String> source =
Splitter.on('.').splitToList(getRequiredIdentifierValue(sourceContext));
+ List<String> target =
Splitter.on('.').splitToList(getRequiredIdentifierValue(targetContext));
String sourceResourceName = source.get(0);
String sourceSchemaName = 3 == source.size() ? source.get(1) : null;
String sourceTableName = source.get(source.size() - 1);
String targetDatabaseName = target.size() > 1 ? target.get(0) : null;
String targetTableName = target.get(target.size() - 1);
- SourceTargetEntry sourceTargetEntry = new
SourceTargetEntry(targetDatabaseName, new DataNode(sourceResourceName,
sourceTableName), targetTableName);
- sourceTargetEntry.getSource().setSchemaName(sourceSchemaName);
- return new
MigrateTableStatement(Collections.singletonList(sourceTargetEntry),
targetDatabaseName);
+ SourceTargetEntry result = new SourceTargetEntry(targetDatabaseName,
new DataNode(sourceResourceName, sourceTableName), targetTableName);
+ result.getSource().setSchemaName(sourceSchemaName);
+ return result;
}
private String getRequiredIdentifierValue(final ParseTree context) {
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index b5b54d4f452..e6fb8ca124f 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -25,7 +25,6 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
import
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
@@ -68,6 +67,7 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy
import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperConfigurationCreator;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
@@ -262,17 +262,13 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
@Override
public MigrationTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
pipelineJobConfig;
- JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
- Map<ActualTableName, LogicTableName> tableNameMap =
JobDataNodeLineConvertUtils.buildTableNameMap(dataNodeLine);
- TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
- CreateTableConfiguration createTableConfig =
buildCreateTableConfiguration(jobConfig, tableNameSchemaNameMapping);
- String dataSourceName =
dataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
- DumperConfiguration dumperConfig =
buildDumperConfiguration(jobConfig.getJobId(), dataSourceName,
jobConfig.getSources().get(dataSourceName), tableNameMap,
tableNameSchemaNameMapping);
+ DumperConfiguration dumperConfig = new
MigrationIncrementalDumperConfigurationCreator(jobConfig).createDumperConfiguration(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
+ CreateTableConfiguration createTableConfig =
buildCreateTableConfiguration(jobConfig,
dumperConfig.getTableNameSchemaNameMapping());
Set<LogicTableName> targetTableNames =
jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
Map<LogicTableName, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
- ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
shardingColumnsMap, tableNameSchemaNameMapping);
- MigrationTaskConfiguration result = new
MigrationTaskConfiguration(dataSourceName, createTableConfig, dumperConfig,
importerConfig);
+ ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
shardingColumnsMap, dumperConfig.getTableNameSchemaNameMapping());
+ MigrationTaskConfiguration result = new
MigrationTaskConfiguration(dumperConfig.getDataSourceName(), createTableConfig,
dumperConfig, importerConfig);
log.info("buildTaskConfiguration, result={}", result);
return result;
}
@@ -296,17 +292,6 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
return result;
}
- private DumperConfiguration buildDumperConfiguration(final String jobId,
final String dataSourceName, final PipelineDataSourceConfiguration
sourceDataSource,
- final
Map<ActualTableName, LogicTableName> tableNameMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
- DumperConfiguration result = new DumperConfiguration();
- result.setJobId(jobId);
- result.setDataSourceName(dataSourceName);
- result.setDataSourceConfig(sourceDataSource);
- result.setTableNameMap(tableNameMap);
- result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
- return result;
- }
-
private ImporterConfiguration buildImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
final
Map<LogicTableName, Set<String>> shardingColumnsMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
MigrationProcessContext processContext = new
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index c01a6d404d7..f4fef5957a4 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -39,6 +39,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -46,14 +47,15 @@ import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorit
import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
-import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
/**
* Data consistency checker for migration job.
@@ -67,7 +69,7 @@ public final class MigrationDataConsistencyChecker implements
PipelineDataConsis
private final ConsistencyCheckJobItemProgressContext progressContext;
- private final Set<TableDataConsistencyChecker> tableCheckers = new
HashSet<>();
+ private final AtomicReference<TableInventoryChecker>
currentTableInventoryChecker = new AtomicReference<>();
public MigrationDataConsistencyChecker(final MigrationJobConfiguration
jobConfig, final InventoryIncrementalProcessContext processContext,
final
ConsistencyCheckJobItemProgressContext progressContext) {
@@ -84,13 +86,14 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
progressContext.setRecordsCount(getRecordsCount());
progressContext.getTableNames().addAll(sourceTableNames);
progressContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
- Map<String, TableDataConsistencyCheckResult> result = new
LinkedHashMap<>();
+ Map<DataNode, TableDataConsistencyCheckResult> result = new
LinkedHashMap<>();
+ TableDataConsistencyChecker tableChecker =
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps);
try (PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager()) {
for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
- checkTableInventoryData(each, algorithmType, algorithmProps,
result, dataSourceManager);
+ checkTableInventoryData(each, tableChecker, result,
dataSourceManager);
}
}
- return result;
+ return result.entrySet().stream().collect(Collectors.toMap(entry ->
DataNodeUtils.formatWithSchema(entry.getKey()), Entry::getValue));
}
private long getRecordsCount() {
@@ -98,15 +101,12 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
return
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
}
- private void checkTableInventoryData(final JobDataNodeLine
jobDataNodeLine, final String algorithmType, final Properties algorithmProps,
- final Map<String,
TableDataConsistencyCheckResult> checkResults, final PipelineDataSourceManager
dataSourceManager) {
+ private void checkTableInventoryData(final JobDataNodeLine
jobDataNodeLine, final TableDataConsistencyChecker tableChecker,
+ final Map<DataNode,
TableDataConsistencyCheckResult> checkResultMap, final
PipelineDataSourceManager dataSourceManager) {
for (JobDataNodeEntry entry : jobDataNodeLine.getEntries()) {
for (DataNode each : entry.getDataNodes()) {
- TableDataConsistencyChecker tableChecker =
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps);
- tableCheckers.add(tableChecker);
TableDataConsistencyCheckResult checkResult =
checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker,
dataSourceManager);
- checkResults.put(DataNodeUtils.formatWithSchema(each),
checkResult);
- tableCheckers.remove(null);
+ checkResultMap.put(each, checkResult);
if (!checkResult.isMatched() &&
tableChecker.isBreakOnInventoryCheckNotMatched()) {
log.info("Unmatched on table '{}', ignore left tables",
DataNodeUtils.formatWithSchema(each));
return;
@@ -129,18 +129,27 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
sourceTable.getSchemaName().getOriginal(),
sourceTable.getTableName().getOriginal(), metaDataLoader);
TableInventoryCheckParameter param = new TableInventoryCheckParameter(
jobConfig.getJobId(), sourceDataSource, targetDataSource,
sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm,
progressContext);
- return tableChecker.checkSingleTableInventoryData(param);
+ TableInventoryChecker tableInventoryChecker =
tableChecker.buildTableInventoryChecker(param);
+ currentTableInventoryChecker.set(tableInventoryChecker);
+ TableDataConsistencyCheckResult result =
tableInventoryChecker.checkSingleTableInventoryData();
+ currentTableInventoryChecker.set(null);
+ return result;
}
@Override
public void cancel() {
- for (TableDataConsistencyChecker each : tableCheckers) {
- each.cancel();
+ TableInventoryChecker checker = currentTableInventoryChecker.get();
+ if (null != checker) {
+ checker.cancel();
}
}
@Override
public boolean isCanceling() {
- return
tableCheckers.stream().anyMatch(TableDataConsistencyChecker::isCanceling);
+ TableInventoryChecker checker = currentTableInventoryChecker.get();
+ if (null == checker) {
+ return false;
+ }
+ return checker.isCanceling();
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java
new file mode 100644
index 00000000000..4763f25aa04
--- /dev/null
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest;
+
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import
org.apache.shardingsphere.data.pipeline.common.config.ingest.IncrementalDumperConfigurationCreator;
+import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
+
+import java.util.Map;
+
+/**
+ * Migration incremental dumper configuration creator.
+ */
+@RequiredArgsConstructor
+public final class MigrationIncrementalDumperConfigurationCreator implements
IncrementalDumperConfigurationCreator {
+
+ private final MigrationJobConfiguration jobConfig;
+
+ @Override
+ public DumperConfiguration createDumperConfiguration(final JobDataNodeLine
jobDataNodeLine) {
+ Map<ActualTableName, LogicTableName> tableNameMap =
JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine);
+ TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
+ String dataSourceName =
jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
+ return buildDumperConfiguration(jobConfig.getJobId(), dataSourceName,
jobConfig.getSources().get(dataSourceName), tableNameMap,
tableNameSchemaNameMapping);
+ }
+
+ private DumperConfiguration buildDumperConfiguration(final String jobId,
final String dataSourceName, final PipelineDataSourceConfiguration
sourceDataSource,
+ final
Map<ActualTableName, LogicTableName> tableNameMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ DumperConfiguration result = new DumperConfiguration();
+ result.setJobId(jobId);
+ result.setDataSourceName(dataSourceName);
+ result.setDataSourceConfig(sourceDataSource);
+ result.setTableNameMap(tableNameMap);
+ result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
+ return result;
+ }
+}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 52aa0d515a6..ecf7e569595 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -205,7 +205,7 @@ class CDCE2EIT {
TableInventoryCheckParameter param = new
TableInventoryCheckParameter("", sourceDataSource, targetDataSource,
schemaTableName, schemaTableName,
tableMetaData.getColumnNames(), uniqueKeys, null,
progressContext);
TableDataConsistencyChecker tableChecker =
TypedSPILoader.getService(TableDataConsistencyChecker.class, "DATA_MATCH", new
Properties());
- TableDataConsistencyCheckResult checkResult =
tableChecker.checkSingleTableInventoryData(param);
+ TableDataConsistencyCheckResult checkResult =
tableChecker.buildTableInventoryChecker(param).checkSingleTableInventoryData();
assertTrue(checkResult.isMatched());
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 0819ab249d2..ffd99fd94c6 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -35,6 +35,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -150,7 +151,11 @@ public abstract class AbstractMigrationE2EIT {
}
protected void assertCheckMigrationSuccess(final PipelineContainerComposer
containerComposer, final String jobId, final String algorithmType) throws
SQLException {
- containerComposer.proxyExecuteWithLog(String.format("CHECK MIGRATION
'%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
+ assertCheckMigrationSuccess(containerComposer, jobId, algorithmType,
new Properties());
+ }
+
+ protected void assertCheckMigrationSuccess(final PipelineContainerComposer
containerComposer, final String jobId, final String algorithmType, final
Properties algorithmProps) throws SQLException {
+
containerComposer.proxyExecuteWithLog(buildConsistencyCheckDistSQL(jobId,
algorithmType, algorithmProps), 0);
// TODO Need to add after the stop then to start, can continue the
consistency check from the previous progress
List<Map<String, Object>> resultList = Collections.emptyList();
for (int i = 0; i < 30; i++) {
@@ -174,4 +179,14 @@ public abstract class AbstractMigrationE2EIT {
assertThat("finished_percentage is not 100",
each.get("finished_percentage").toString(), is("100"));
}
}
+
+ private String buildConsistencyCheckDistSQL(final String jobId, final
String algorithmType, final Properties algorithmProps) {
+ if (null == algorithmProps || algorithmProps.isEmpty()) {
+ return String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')",
jobId, algorithmType);
+ }
+ String sql = "CHECK MIGRATION '%s' BY TYPE (NAME='%s', PROPERTIES("
+ + algorithmProps.entrySet().stream().map(entry ->
String.format("'%s'='%s'", entry.getKey(),
entry.getValue())).collect(Collectors.joining(","))
+ + "))";
+ return String.format(sql, jobId, algorithmType);
+ }
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 37f9b4c6294..740f7c52395 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -43,6 +43,7 @@ import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.is;
@@ -87,11 +88,13 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
startMigrationByJobId(containerComposer, orderJobId);
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource,
"t_order", 10000);
- assertMigrationSuccessById(containerComposer, orderJobId,
"DATA_MATCH");
+ Properties algorithmProps = new Properties();
+ algorithmProps.setProperty("chunk-size", "300");
+ assertMigrationSuccessById(containerComposer, orderJobId,
"DATA_MATCH", algorithmProps);
String orderItemJobId = getJobIdByTableName(containerComposer,
"ds_0.t_order_item");
- assertMigrationSuccessById(containerComposer, orderItemJobId,
"DATA_MATCH");
+ assertMigrationSuccessById(containerComposer, orderItemJobId,
"DATA_MATCH", algorithmProps);
Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() ->
true);
- assertMigrationSuccessById(containerComposer, orderItemJobId,
"CRC32_MATCH");
+ assertMigrationSuccessById(containerComposer, orderItemJobId,
"CRC32_MATCH", new Properties());
for (String each : listJobId(containerComposer)) {
commitMigrationByJobId(containerComposer, each);
}
@@ -101,13 +104,13 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
}
}
- private void assertMigrationSuccessById(final PipelineContainerComposer
containerComposer, final String jobId, final String algorithmType) throws
SQLException {
+ private void assertMigrationSuccessById(final PipelineContainerComposer
containerComposer, final String jobId, final String algorithmType, final
Properties algorithmProps) throws SQLException {
List<Map<String, Object>> jobStatus =
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
for (Map<String, Object> each : jobStatus) {
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) >
0);
assertThat(Integer.parseInt(each.get("inventory_finished_percentage").toString()),
is(100));
}
- assertCheckMigrationSuccess(containerComposer, jobId, algorithmType);
+ assertCheckMigrationSuccess(containerComposer, jobId, algorithmType,
algorithmProps);
}
private static boolean isEnabled() {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableDataConsistencyChecker.java
similarity index 70%
copy from
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
copy to
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableDataConsistencyChecker.java
index b0270f10613..a3dd3e28c4a 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableDataConsistencyChecker.java
@@ -17,11 +17,9 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
@@ -29,20 +27,11 @@ import
org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
import java.util.Collection;
@SPIDescription("Fixture description.")
-public final class TableDataConsistencyCheckerFixture implements
TableDataConsistencyChecker {
+public final class FixtureTableDataConsistencyChecker implements
TableDataConsistencyChecker {
@Override
- public TableDataConsistencyCheckResult checkSingleTableInventoryData(final
TableInventoryCheckParameter param) {
- return new TableDataConsistencyCheckResult(new
TableDataConsistencyCountCheckResult(2, 2), new
TableDataConsistencyContentCheckResult(true));
- }
-
- @Override
- public void cancel() {
- }
-
- @Override
- public boolean isCanceling() {
- return false;
+ public TableInventoryChecker buildTableInventoryChecker(final
TableInventoryCheckParameter param) {
+ return new FixtureTableInventoryChecker();
}
@Override
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableInventoryChecker.java
similarity index 66%
rename from
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
rename to
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableInventoryChecker.java
index b0270f10613..0a9e1841220 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableInventoryChecker.java
@@ -20,21 +20,9 @@ package
org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
-import java.util.Collection;
-
-@SPIDescription("Fixture description.")
-public final class TableDataConsistencyCheckerFixture implements
TableDataConsistencyChecker {
-
- @Override
- public TableDataConsistencyCheckResult checkSingleTableInventoryData(final
TableInventoryCheckParameter param) {
- return new TableDataConsistencyCheckResult(new
TableDataConsistencyCountCheckResult(2, 2), new
TableDataConsistencyContentCheckResult(true));
- }
+public final class FixtureTableInventoryChecker implements
TableInventoryChecker {
@Override
public void cancel() {
@@ -46,12 +34,7 @@ public final class TableDataConsistencyCheckerFixture
implements TableDataConsis
}
@Override
- public Collection<DatabaseType> getSupportedDatabaseTypes() {
- return
ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class);
- }
-
- @Override
- public String getType() {
- return "FIXTURE";
+ public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
+ return new TableDataConsistencyCheckResult(new
TableDataConsistencyCountCheckResult(2, 2), new
TableDataConsistencyContentCheckResult(true));
}
}
diff --git
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker
index f6f49dec753..b771deadee2 100644
---
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker
+++
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.test.it.data.pipeline.core.fixture.TableDataConsistencyCheckerFixture
+org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureTableDataConsistencyChecker