This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b6b3ce5e7ca Upgrade junit 5 on MigrationE2EIT (#24615)
b6b3ce5e7ca is described below
commit b6b3ce5e7ca72f2e890ea17773c0b7b218467021
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Mar 15 14:56:28 2023 +0800
Upgrade junit 5 on MigrationE2EIT (#24615)
* Upgrade junit 5 on CDCE2EIT
* Upgrade junit 5 on MigrationE2EIT
* Fix checkstyle
---
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 117 +++++-----
.../cases/migration/AbstractMigrationE2EIT.java | 43 ++--
.../general/MySQLMigrationGeneralE2EIT.java | 118 +++++-----
.../general/PostgreSQLMigrationGeneralE2EIT.java | 148 ++++++------
.../migration/general/RulesMigrationE2EIT.java | 97 ++++----
.../primarykey/IndexesMigrationE2EIT.java | 252 +++++++++++----------
.../primarykey/MariaDBMigrationE2EIT.java | 89 ++++----
.../primarykey/TextPrimaryKeyMigrationE2EIT.java | 100 ++++----
8 files changed, 492 insertions(+), 472 deletions(-)
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 7d11825653a..c375ef0fb1b 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -48,7 +48,6 @@ import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.Pipeline
import
org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
import
org.apache.shardingsphere.test.e2e.env.container.atomic.constants.ProxyContainerConstants;
import
org.apache.shardingsphere.test.e2e.env.container.atomic.util.StorageContainerUtil;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
@@ -93,76 +92,68 @@ public final class CDCE2EIT {
private static final String SOURCE_TABLE_NAME = "t_order";
- private final PipelineContainerComposer containerComposer;
-
private final ExecutorService executor =
Executors.newSingleThreadExecutor();
- public CDCE2EIT(final PipelineTestParameter testParam) {
- containerComposer = new PipelineContainerComposer(testParam, new
CDCJobType());
- }
-
- @AfterEach
- public void tearDown() {
- containerComposer.close();
- }
-
@ParameterizedTest(name = "{0}")
@EnabledIf("isEnabled")
@ArgumentsSource(TestCaseArgumentsProvider.class)
- public void assertCDCDataImportSuccess() throws SQLException,
InterruptedException {
- // make sure the program time zone same with the database server at CI.
- TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
- for (String each : Arrays.asList(PipelineContainerComposer.DS_0,
PipelineContainerComposer.DS_1)) {
- containerComposer.registerStorageUnit(each);
- }
- createOrderTableRule();
- try (Connection connection =
containerComposer.getProxyDataSource().getConnection()) {
- initSchemaAndTable(connection, 2);
- }
- DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
- Pair<List<Object[]>, List<Object[]>> dataPair =
PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(),
20);
- log.info("init data begin: {}", LocalDateTime.now());
- DataSourceExecuteUtil.execute(jdbcDataSource,
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME),
dataPair.getLeft());
- log.info("init data end: {}", LocalDateTime.now());
- try (
- Connection connection =
DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
false),
- containerComposer.getUsername(),
containerComposer.getPassword())) {
- initSchemaAndTable(connection, 0);
- }
- startCDCClient();
- Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
STREAMING LIST").isEmpty());
- String jobId = containerComposer.queryForListWithLog("SHOW STREAMING
LIST").get(0).get("id").toString();
- containerComposer.waitIncrementTaskFinished(String.format("SHOW
STREAMING STATUS '%s'", jobId));
- containerComposer.startIncrementTask(new
E2EIncrementalTask(jdbcDataSource, SOURCE_TABLE_NAME, new
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20));
- containerComposer.getIncreaseTaskThread().join(10000L);
- List<Map<String, Object>> actualProxyList;
- try (Connection connection = jdbcDataSource.getConnection()) {
- ResultSet resultSet =
connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER
BY order_id ASC", getOrderTableNameWithSchema()));
- actualProxyList =
containerComposer.transformResultSetToList(resultSet);
+ public void assertCDCDataImportSuccess(final PipelineTestParameter
testParam) throws SQLException, InterruptedException {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new CDCJobType())) {
+ // make sure the program time zone same with the database server
at CI.
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+ for (String each : Arrays.asList(PipelineContainerComposer.DS_0,
PipelineContainerComposer.DS_1)) {
+ containerComposer.registerStorageUnit(each);
+ }
+ createOrderTableRule(containerComposer);
+ try (Connection connection =
containerComposer.getProxyDataSource().getConnection()) {
+ initSchemaAndTable(containerComposer, connection, 2);
+ }
+ DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
+ Pair<List<Object[]>, List<Object[]>> dataPair =
PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(),
20);
+ log.info("init data begin: {}", LocalDateTime.now());
+ DataSourceExecuteUtil.execute(jdbcDataSource,
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME),
dataPair.getLeft());
+ log.info("init data end: {}", LocalDateTime.now());
+ try (
+ Connection connection =
DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
false),
+ containerComposer.getUsername(),
containerComposer.getPassword())) {
+ initSchemaAndTable(containerComposer, connection, 0);
+ }
+ startCDCClient(containerComposer);
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
STREAMING LIST").isEmpty());
+ String jobId = containerComposer.queryForListWithLog("SHOW
STREAMING LIST").get(0).get("id").toString();
+ containerComposer.waitIncrementTaskFinished(String.format("SHOW
STREAMING STATUS '%s'", jobId));
+ containerComposer.startIncrementTask(new
E2EIncrementalTask(jdbcDataSource, SOURCE_TABLE_NAME, new
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20));
+ containerComposer.getIncreaseTaskThread().join(10000L);
+ List<Map<String, Object>> actualProxyList;
+ try (Connection connection = jdbcDataSource.getConnection()) {
+ ResultSet resultSet =
connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER
BY order_id ASC", getOrderTableNameWithSchema(containerComposer)));
+ actualProxyList =
containerComposer.transformResultSetToList(resultSet);
+ }
+ Awaitility.await().atMost(20, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS)
+ .until(() -> listOrderRecords(containerComposer,
getOrderTableNameWithSchema(containerComposer)).size() ==
actualProxyList.size());
+ SchemaTableName schemaTableName =
containerComposer.getDatabaseType().isSchemaAvailable()
+ ? new SchemaTableName(new
SchemaName(PipelineContainerComposer.SCHEMA_NAME), new
TableName(SOURCE_TABLE_NAME))
+ : new SchemaTableName(new SchemaName(null), new
TableName(SOURCE_TABLE_NAME));
+ PipelineDataSourceWrapper targetDataSource = new
PipelineDataSourceWrapper(StorageContainerUtil.generateDataSource(
+
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
false),
+ containerComposer.getUsername(),
containerComposer.getPassword()), containerComposer.getDatabaseType());
+ PipelineDataSourceWrapper sourceDataSource = new
PipelineDataSourceWrapper(jdbcDataSource, containerComposer.getDatabaseType());
+ StandardPipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(targetDataSource);
+ PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(PipelineContainerComposer.SCHEMA_NAME,
"t_order");
+ PipelineColumnMetaData primaryKeyMetaData =
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
+ ConsistencyCheckJobItemProgressContext progressContext = new
ConsistencyCheckJobItemProgressContext("", 0);
+ SingleTableInventoryDataConsistencyChecker checker = new
SingleTableInventoryDataConsistencyChecker("", sourceDataSource,
targetDataSource, schemaTableName, schemaTableName,
+ tableMetaData.getColumnNames(), primaryKeyMetaData, null,
progressContext);
+ DataConsistencyCheckResult checkResult = checker.check(new
DataMatchDataConsistencyCalculateAlgorithm());
+ assertTrue(checkResult.isMatched());
}
- Awaitility.await().atMost(20, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() ->
listOrderRecords(getOrderTableNameWithSchema()).size() ==
actualProxyList.size());
- SchemaTableName schemaTableName =
containerComposer.getDatabaseType().isSchemaAvailable()
- ? new SchemaTableName(new
SchemaName(PipelineContainerComposer.SCHEMA_NAME), new
TableName(SOURCE_TABLE_NAME))
- : new SchemaTableName(new SchemaName(null), new
TableName(SOURCE_TABLE_NAME));
- PipelineDataSourceWrapper targetDataSource = new
PipelineDataSourceWrapper(StorageContainerUtil.generateDataSource(
-
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
false),
- containerComposer.getUsername(),
containerComposer.getPassword()), containerComposer.getDatabaseType());
- PipelineDataSourceWrapper sourceDataSource = new
PipelineDataSourceWrapper(jdbcDataSource, containerComposer.getDatabaseType());
- StandardPipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(targetDataSource);
- PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(PipelineContainerComposer.SCHEMA_NAME,
"t_order");
- PipelineColumnMetaData primaryKeyMetaData =
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
- ConsistencyCheckJobItemProgressContext progressContext = new
ConsistencyCheckJobItemProgressContext("", 0);
- SingleTableInventoryDataConsistencyChecker checker = new
SingleTableInventoryDataConsistencyChecker("", sourceDataSource,
targetDataSource, schemaTableName, schemaTableName,
- tableMetaData.getColumnNames(), primaryKeyMetaData, null,
progressContext);
- DataConsistencyCheckResult checkResult = checker.check(new
DataMatchDataConsistencyCalculateAlgorithm());
- assertTrue(checkResult.isMatched());
}
- private void createOrderTableRule() throws SQLException {
+ private void createOrderTableRule(final PipelineContainerComposer
containerComposer) throws SQLException {
containerComposer.proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 2);
}
- private void initSchemaAndTable(final Connection connection, final int
sleepSeconds) throws SQLException {
+ private void initSchemaAndTable(final PipelineContainerComposer
containerComposer, final Connection connection, final int sleepSeconds) throws
SQLException {
containerComposer.createSchema(connection, sleepSeconds);
String sql =
containerComposer.getExtraSQLCommand().getCreateTableOrder(SOURCE_TABLE_NAME);
log.info("create table sql: {}", sql);
@@ -172,7 +163,7 @@ public final class CDCE2EIT {
}
}
- private void startCDCClient() {
+ private void startCDCClient(final PipelineContainerComposer
containerComposer) {
ImportDataSourceParameter importDataSourceParam = new
ImportDataSourceParameter(containerComposer.appendExtraParameter(
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
false, 0)), containerComposer.getUsername(), containerComposer.getPassword());
StartCDCClientParameter parameter = new
StartCDCClientParameter(importDataSourceParam);
@@ -193,7 +184,7 @@ public final class CDCE2EIT {
});
}
- private List<Map<String, Object>> listOrderRecords(final String
tableNameWithSchema) throws SQLException {
+ private List<Map<String, Object>> listOrderRecords(final
PipelineContainerComposer containerComposer, final String tableNameWithSchema)
throws SQLException {
try (
Connection connection = DriverManager.getConnection(
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
false), containerComposer.getUsername(), containerComposer.getPassword())) {
@@ -202,7 +193,7 @@ public final class CDCE2EIT {
}
}
- private String getOrderTableNameWithSchema() {
+ private String getOrderTableNameWithSchema(final PipelineContainerComposer
containerComposer) {
return containerComposer.getDatabaseType().isSchemaAvailable() ?
String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) :
SOURCE_TABLE_NAME;
}
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 09528985b38..cdab5a8d7b8 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -21,14 +21,11 @@ import com.google.common.base.Strings;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
-import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
import
org.apache.shardingsphere.test.e2e.data.pipeline.command.MigrationDistSQLCommand;
import
org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
import
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
-import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
import
org.apache.shardingsphere.test.e2e.env.container.atomic.util.DatabaseTypeUtil;
-import org.junit.After;
import org.opengauss.util.PSQLException;
import javax.xml.bind.JAXB;
@@ -49,21 +46,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Slf4j
public abstract class AbstractMigrationE2EIT {
- private final PipelineContainerComposer containerComposer;
-
private final MigrationDistSQLCommand migrationDistSQL;
- public AbstractMigrationE2EIT(final PipelineTestParameter testParam, final
JobType jobType) {
- containerComposer = new PipelineContainerComposer(testParam, jobType);
+ public AbstractMigrationE2EIT() {
migrationDistSQL =
JAXB.unmarshal(Objects.requireNonNull(AbstractMigrationE2EIT.class.getClassLoader().getResource("env/common/migration-command.xml")),
MigrationDistSQLCommand.class);
}
- @After
- public final void tearDown() {
- containerComposer.close();
- }
-
- protected void addMigrationSourceResource() throws SQLException {
+ protected void addMigrationSourceResource(final PipelineContainerComposer
containerComposer) throws SQLException {
if (PipelineEnvTypeEnum.NATIVE ==
PipelineE2EEnvironment.getInstance().getItEnvType()) {
try {
containerComposer.proxyExecuteWithLog("UNREGISTER MIGRATION
SOURCE STORAGE UNIT ds_0", 2);
@@ -77,7 +66,7 @@ public abstract class AbstractMigrationE2EIT {
containerComposer.addResource(addSourceResource);
}
- protected void addMigrationTargetResource() throws SQLException {
+ protected void addMigrationTargetResource(final PipelineContainerComposer
containerComposer) throws SQLException {
String addTargetResource =
migrationDistSQL.getRegisterMigrationTargetStorageUnitTemplate().replace("${user}",
containerComposer.getUsername())
.replace("${password}", containerComposer.getPassword())
.replace("${ds2}",
containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_2,
true)))
@@ -88,7 +77,7 @@ public abstract class AbstractMigrationE2EIT {
assertThat(resources.size(), is(3));
}
- protected void createSourceSchema(final String schemaName) throws
SQLException {
+ protected void createSourceSchema(final PipelineContainerComposer
containerComposer, final String schemaName) throws SQLException {
if
(DatabaseTypeUtil.isPostgreSQL(containerComposer.getDatabaseType())) {
containerComposer.sourceExecuteWithLog(String.format("CREATE
SCHEMA IF NOT EXISTS %s", schemaName));
return;
@@ -107,53 +96,53 @@ public abstract class AbstractMigrationE2EIT {
}
}
- protected void createTargetOrderTableRule() throws SQLException {
+ protected void createTargetOrderTableRule(final PipelineContainerComposer
containerComposer) throws SQLException {
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(),
2);
}
- protected void createTargetOrderTableEncryptRule() throws SQLException {
+ protected void createTargetOrderTableEncryptRule(final
PipelineContainerComposer containerComposer) throws SQLException {
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableEncryptRule(),
2);
}
- protected void createTargetOrderItemTableRule() throws SQLException {
+ protected void createTargetOrderItemTableRule(final
PipelineContainerComposer containerComposer) throws SQLException {
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderItemTableRule(),
2);
}
- protected void startMigration(final String sourceTableName, final String
targetTableName) throws SQLException {
+ protected void startMigration(final PipelineContainerComposer
containerComposer, final String sourceTableName, final String targetTableName)
throws SQLException {
containerComposer.proxyExecuteWithLog(migrationDistSQL.getMigrationSingleTable(sourceTableName,
targetTableName), 5);
}
- protected void startMigrationWithSchema(final String sourceTableName,
final String targetTableName) throws SQLException {
+ protected void startMigrationWithSchema(final PipelineContainerComposer
containerComposer, final String sourceTableName, final String targetTableName)
throws SQLException {
containerComposer.proxyExecuteWithLog(migrationDistSQL.getMigrationSingleTableWithSchema(sourceTableName,
targetTableName), 5);
}
- protected void addMigrationProcessConfig() throws SQLException {
+ protected void addMigrationProcessConfig(final PipelineContainerComposer
containerComposer) throws SQLException {
containerComposer.proxyExecuteWithLog(migrationDistSQL.getAlterMigrationRule(),
0);
}
- protected void stopMigrationByJobId(final String jobId) throws
SQLException {
+ protected void stopMigrationByJobId(final PipelineContainerComposer
containerComposer, final String jobId) throws SQLException {
containerComposer.proxyExecuteWithLog(String.format("STOP MIGRATION
'%s'", jobId), 1);
}
- protected void startMigrationByJobId(final String jobId) throws
SQLException {
+ protected void startMigrationByJobId(final PipelineContainerComposer
containerComposer, final String jobId) throws SQLException {
containerComposer.proxyExecuteWithLog(String.format("START MIGRATION
'%s'", jobId), 1);
}
- protected void commitMigrationByJobId(final String jobId) throws
SQLException {
+ protected void commitMigrationByJobId(final PipelineContainerComposer
containerComposer, final String jobId) throws SQLException {
containerComposer.proxyExecuteWithLog(String.format("COMMIT MIGRATION
'%s'", jobId), 1);
}
- protected List<String> listJobId() {
+ protected List<String> listJobId(final PipelineContainerComposer
containerComposer) {
List<Map<String, Object>> jobList =
containerComposer.queryForListWithLog("SHOW MIGRATION LIST");
return jobList.stream().map(a ->
a.get("id").toString()).collect(Collectors.toList());
}
- protected String getJobIdByTableName(final String tableName) {
+ protected String getJobIdByTableName(final PipelineContainerComposer
containerComposer, final String tableName) {
List<Map<String, Object>> jobList =
containerComposer.queryForListWithLog("SHOW MIGRATION LIST");
return jobList.stream().filter(a ->
a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new
RuntimeException("not find " + tableName + " table")).get("id").toString();
}
- protected void assertCheckMigrationSuccess(final String jobId, final
String algorithmType) throws SQLException {
+ protected void assertCheckMigrationSuccess(final PipelineContainerComposer
containerComposer, final String jobId, final String algorithmType) throws
SQLException {
containerComposer.proxyExecuteWithLog(String.format("CHECK MIGRATION
'%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
// TODO Need to add after the stop then to start, can continue the
consistency check from the previous progress
List<Map<String, Object>> resultList = Collections.emptyList();
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 7097a152d65..61d9663d5f3 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -31,10 +31,12 @@ import
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTyp
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
import
org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
import java.sql.SQLException;
import java.time.LocalDateTime;
@@ -43,12 +45,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
-@RunWith(Parameterized.class)
@Slf4j
public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
@@ -56,64 +58,68 @@ public final class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
private static final String TARGET_TABLE_NAME = "t_order";
- public MySQLMigrationGeneralE2EIT(final PipelineTestParameter testParam) {
- super(testParam, new MigrationJobType());
+ @ParameterizedTest(name = "{0}")
+ @EnabledIf("isEnabled")
+ @ArgumentsSource(TestCaseArgumentsProvider.class)
+ public void assertMigrationSuccess(final PipelineTestParameter testParam)
throws SQLException, InterruptedException {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ addMigrationProcessConfig(containerComposer);
+ containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
+ containerComposer.createSourceOrderItemTable();
+ addMigrationSourceResource(containerComposer);
+ addMigrationTargetResource(containerComposer);
+ createTargetOrderTableRule(containerComposer);
+ createTargetOrderTableEncryptRule(containerComposer);
+ createTargetOrderItemTableRule(containerComposer);
+ Pair<List<Object[]>, List<Object[]>> dataPair =
PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(),
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+ log.info("init data begin: {}", LocalDateTime.now());
+
DataSourceExecuteUtil.execute(containerComposer.getSourceDataSource(),
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME),
dataPair.getLeft());
+
DataSourceExecuteUtil.execute(containerComposer.getSourceDataSource(),
containerComposer.getExtraSQLCommand().getFullInsertOrderItem(),
dataPair.getRight());
+ log.info("init data end: {}", LocalDateTime.now());
+ startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
+ startMigration(containerComposer, "t_order_item", "t_order_item");
+ String orderJobId = getJobIdByTableName(containerComposer, "ds_0."
+ SOURCE_TABLE_NAME);
+ containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", orderJobId));
+ containerComposer.startIncrementTask(
+ new
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME,
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
+ assertMigrationSuccessById(containerComposer, orderJobId,
"DATA_MATCH");
+ String orderItemJobId = getJobIdByTableName(containerComposer,
"ds_0.t_order_item");
+ assertMigrationSuccessById(containerComposer, orderItemJobId,
"DATA_MATCH");
+ ThreadUtil.sleep(2, TimeUnit.SECONDS);
+ assertMigrationSuccessById(containerComposer, orderItemJobId,
"CRC32_MATCH");
+ for (String each : listJobId(containerComposer)) {
+ commitMigrationByJobId(containerComposer, each);
+ }
+ List<String> lastJobIds = listJobId(containerComposer);
+ assertTrue(lastJobIds.isEmpty());
+ containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
+
containerComposer.assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT,
"");
+ }
}
- @Parameters(name = "{0}")
- public static Collection<PipelineTestParameter> getTestParameters() {
- Collection<PipelineTestParameter> result = new LinkedList<>();
- if (PipelineE2EEnvironment.getInstance().getItEnvType() ==
PipelineEnvTypeEnum.NONE) {
- return result;
- }
- MySQLDatabaseType databaseType = new MySQLDatabaseType();
- for (String each :
PipelineE2EEnvironment.getInstance().listStorageContainerImages(databaseType)) {
- result.add(new PipelineTestParameter(databaseType, each,
"env/scenario/general/mysql.xml"));
+ private void assertMigrationSuccessById(final PipelineContainerComposer
containerComposer, final String jobId, final String algorithmType) throws
SQLException, InterruptedException {
+ List<Map<String, Object>> jobStatus =
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
+ for (Map<String, Object> each : jobStatus) {
+
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) >
0);
+
assertThat(Integer.parseInt(each.get("inventory_finished_percentage").toString()),
is(100));
}
- return result;
+ assertCheckMigrationSuccess(containerComposer, jobId, algorithmType);
}
- @Test
- public void assertMigrationSuccess() throws SQLException,
InterruptedException {
- addMigrationProcessConfig();
- getContainerComposer().createSourceOrderTable(SOURCE_TABLE_NAME);
- getContainerComposer().createSourceOrderItemTable();
- addMigrationSourceResource();
- addMigrationTargetResource();
- createTargetOrderTableRule();
- createTargetOrderTableEncryptRule();
- createTargetOrderItemTableRule();
- Pair<List<Object[]>, List<Object[]>> dataPair =
PipelineCaseHelper.generateFullInsertData(getContainerComposer().getDatabaseType(),
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
- log.info("init data begin: {}", LocalDateTime.now());
-
DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(),
getContainerComposer().getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME),
dataPair.getLeft());
-
DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(),
getContainerComposer().getExtraSQLCommand().getFullInsertOrderItem(),
dataPair.getRight());
- log.info("init data end: {}", LocalDateTime.now());
- startMigration(SOURCE_TABLE_NAME, TARGET_TABLE_NAME);
- startMigration("t_order_item", "t_order_item");
- String orderJobId = getJobIdByTableName("ds_0." + SOURCE_TABLE_NAME);
- getContainerComposer().waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", orderJobId));
- getContainerComposer().startIncrementTask(
- new
E2EIncrementalTask(getContainerComposer().getSourceDataSource(),
SOURCE_TABLE_NAME, new SnowflakeKeyGenerateAlgorithm(),
getContainerComposer().getDatabaseType(), 30));
- assertMigrationSuccessById(orderJobId, "DATA_MATCH");
- String orderItemJobId = getJobIdByTableName("ds_0.t_order_item");
- assertMigrationSuccessById(orderItemJobId, "DATA_MATCH");
- ThreadUtil.sleep(2, TimeUnit.SECONDS);
- assertMigrationSuccessById(orderItemJobId, "CRC32_MATCH");
- for (String each : listJobId()) {
- commitMigrationByJobId(each);
- }
- List<String> lastJobIds = listJobId();
- assertTrue(lastJobIds.isEmpty());
- getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA",
2);
-
getContainerComposer().assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT,
"");
+ private static boolean isEnabled() {
+ return PipelineEnvTypeEnum.NONE !=
PipelineE2EEnvironment.getInstance().getItEnvType() &&
!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
MySQLDatabaseType()).isEmpty();
}
- private void assertMigrationSuccessById(final String jobId, final String
algorithmType) throws SQLException, InterruptedException {
- List<Map<String, Object>> jobStatus =
getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
- for (Map<String, Object> each : jobStatus) {
-
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) >
0);
-
assertThat(Integer.parseInt(each.get("inventory_finished_percentage").toString()),
is(100));
+ private static class TestCaseArgumentsProvider implements
ArgumentsProvider {
+
+ @Override
+ public Stream<? extends Arguments> provideArguments(final
ExtensionContext extensionContext) {
+ Collection<Arguments> result = new LinkedList<>();
+ MySQLDatabaseType databaseType = new MySQLDatabaseType();
+ for (String each :
PipelineE2EEnvironment.getInstance().listStorageContainerImages(databaseType)) {
+ result.add(Arguments.of(new
PipelineTestParameter(databaseType, each, "env/scenario/general/mysql.xml")));
+ }
+ return result.stream();
}
- assertCheckMigrationSuccess(jobId, algorithmType);
}
}
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index d966f9c2eaf..e9f4b5a60f8 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -31,10 +31,12 @@ import
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTyp
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
import
org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import java.sql.SQLException;
@@ -43,88 +45,94 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertTrue;
-@RunWith(Parameterized.class)
@Slf4j
public final class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
private static final String SOURCE_TABLE_NAME = "t_order_copy";
private static final String TARGET_TABLE_NAME = "t_order";
-
- public PostgreSQLMigrationGeneralE2EIT(final PipelineTestParameter
testParam) {
- super(testParam, new MigrationJobType());
- }
-
- @Parameters(name = "{0}")
- public static Collection<PipelineTestParameter> getTestParameters() {
- Collection<PipelineTestParameter> result = new LinkedList<>();
- if (PipelineE2EEnvironment.getInstance().getItEnvType() ==
PipelineEnvTypeEnum.NONE) {
- return result;
- }
- for (String each :
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
PostgreSQLDatabaseType())) {
- result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(),
each, "env/scenario/general/postgresql.xml"));
+
+ @ParameterizedTest(name = "{0}")
+ @EnabledIf("isEnabled")
+ @ArgumentsSource(TestCaseArgumentsProvider.class)
+ public void assertMigrationSuccess(final PipelineTestParameter testParam)
throws SQLException, InterruptedException {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ addMigrationProcessConfig(containerComposer);
+ createSourceSchema(containerComposer,
PipelineContainerComposer.SCHEMA_NAME);
+ containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
+ containerComposer.createSourceOrderItemTable();
+
containerComposer.createSourceTableIndexList(PipelineContainerComposer.SCHEMA_NAME,
SOURCE_TABLE_NAME);
+
containerComposer.createSourceCommentOnList(PipelineContainerComposer.SCHEMA_NAME,
SOURCE_TABLE_NAME);
+ addMigrationSourceResource(containerComposer);
+ addMigrationTargetResource(containerComposer);
+ createTargetOrderTableRule(containerComposer);
+ createTargetOrderItemTableRule(containerComposer);
+ Pair<List<Object[]>, List<Object[]>> dataPair =
PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(),
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+ log.info("init data begin: {}", LocalDateTime.now());
+
DataSourceExecuteUtil.execute(containerComposer.getSourceDataSource(),
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME),
dataPair.getLeft());
+
DataSourceExecuteUtil.execute(containerComposer.getSourceDataSource(),
containerComposer.getExtraSQLCommand().getFullInsertOrderItem(),
dataPair.getRight());
+ log.info("init data end: {}", LocalDateTime.now());
+ startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME,
"t_order");
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS).until(() -> listJobId(containerComposer).size() > 0);
+ String jobId = getJobIdByTableName(containerComposer, "ds_0.test."
+ SOURCE_TABLE_NAME);
+ containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ containerComposer.startIncrementTask(new E2EIncrementalTask(
+ containerComposer.getSourceDataSource(), String.join(".",
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME),
+ new SnowflakeKeyGenerateAlgorithm(),
containerComposer.getDatabaseType(), 20));
+ checkOrderMigration(containerComposer, jobId);
+ checkOrderItemMigration(containerComposer);
+ for (String each : listJobId(containerComposer)) {
+ commitMigrationByJobId(containerComposer, each);
+ }
+ List<String> lastJobIds = listJobId(containerComposer);
+ assertTrue(lastJobIds.isEmpty());
+ containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
+
containerComposer.assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT
+ 1, PipelineContainerComposer.SCHEMA_NAME);
}
- for (String each :
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
OpenGaussDatabaseType())) {
- result.add(new PipelineTestParameter(new OpenGaussDatabaseType(),
each, "env/scenario/general/postgresql.xml"));
- }
- return result;
}
- @Test
- public void assertMigrationSuccess() throws SQLException,
InterruptedException {
- addMigrationProcessConfig();
- createSourceSchema(PipelineContainerComposer.SCHEMA_NAME);
- getContainerComposer().createSourceOrderTable(SOURCE_TABLE_NAME);
- getContainerComposer().createSourceOrderItemTable();
-
getContainerComposer().createSourceTableIndexList(PipelineContainerComposer.SCHEMA_NAME,
SOURCE_TABLE_NAME);
-
getContainerComposer().createSourceCommentOnList(PipelineContainerComposer.SCHEMA_NAME,
SOURCE_TABLE_NAME);
- addMigrationSourceResource();
- addMigrationTargetResource();
- createTargetOrderTableRule();
- createTargetOrderItemTableRule();
- Pair<List<Object[]>, List<Object[]>> dataPair =
PipelineCaseHelper.generateFullInsertData(getContainerComposer().getDatabaseType(),
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
- log.info("init data begin: {}", LocalDateTime.now());
-
DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(),
getContainerComposer().getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME),
dataPair.getLeft());
-
DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(),
getContainerComposer().getExtraSQLCommand().getFullInsertOrderItem(),
dataPair.getRight());
- log.info("init data end: {}", LocalDateTime.now());
- startMigrationWithSchema(SOURCE_TABLE_NAME, "t_order");
- Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS).until(() -> listJobId().size() > 0);
- String jobId = getJobIdByTableName("ds_0.test." + SOURCE_TABLE_NAME);
- getContainerComposer().waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
- getContainerComposer().startIncrementTask(new E2EIncrementalTask(
- getContainerComposer().getSourceDataSource(), String.join(".",
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME),
- new SnowflakeKeyGenerateAlgorithm(),
getContainerComposer().getDatabaseType(), 20));
- checkOrderMigration(jobId);
- checkOrderItemMigration();
- for (String each : listJobId()) {
- commitMigrationByJobId(each);
- }
- List<String> lastJobIds = listJobId();
- assertTrue(lastJobIds.isEmpty());
- getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA",
2);
-
getContainerComposer().assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT
+ 1, PipelineContainerComposer.SCHEMA_NAME);
- }
-
- private void checkOrderMigration(final String jobId) throws SQLException,
InterruptedException {
- getContainerComposer().waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
- stopMigrationByJobId(jobId);
+ private void checkOrderMigration(final PipelineContainerComposer
containerComposer, final String jobId) throws SQLException,
InterruptedException {
+ containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ stopMigrationByJobId(containerComposer, jobId);
long recordId = new SnowflakeKeyGenerateAlgorithm().generateKey();
- getContainerComposer().sourceExecuteWithLog(String.format("INSERT INTO
%s (order_id,user_id,status) VALUES (%s, %s, '%s')",
+ containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s
(order_id,user_id,status) VALUES (%s, %s, '%s')",
String.join(".", PipelineContainerComposer.SCHEMA_NAME,
SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
- startMigrationByJobId(jobId);
+ startMigrationByJobId(containerComposer, jobId);
// must refresh firstly, otherwise proxy can't get schema and table
info
- getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA;",
2);
- getContainerComposer().assertProxyOrderRecordExist(String.join(".",
PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
- assertCheckMigrationSuccess(jobId, "DATA_MATCH");
+ containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
+ containerComposer.assertProxyOrderRecordExist(String.join(".",
PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
+ assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
+ }
+
+ private void checkOrderItemMigration(final PipelineContainerComposer
containerComposer) throws SQLException, InterruptedException {
+ startMigrationWithSchema(containerComposer, "t_order_item",
"t_order_item");
+ String jobId = getJobIdByTableName(containerComposer,
"ds_0.test.t_order_item");
+ containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
}
- private void checkOrderItemMigration() throws SQLException,
InterruptedException {
- startMigrationWithSchema("t_order_item", "t_order_item");
- String jobId = getJobIdByTableName("ds_0.test.t_order_item");
- getContainerComposer().waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
- assertCheckMigrationSuccess(jobId, "DATA_MATCH");
+ private static boolean isEnabled() {
+ return PipelineEnvTypeEnum.NONE !=
PipelineE2EEnvironment.getInstance().getItEnvType()
+ &&
(!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
PostgreSQLDatabaseType()).isEmpty()
+ ||
!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
OpenGaussDatabaseType()).isEmpty());
+ }
+
+ private static class TestCaseArgumentsProvider implements
ArgumentsProvider {
+
+ @Override
+ public Stream<? extends Arguments> provideArguments(final
ExtensionContext extensionContext) {
+ Collection<Arguments> result = new LinkedList<>();
+ for (String each :
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
PostgreSQLDatabaseType())) {
+ result.add(Arguments.of(new PipelineTestParameter(new
PostgreSQLDatabaseType(), each, "env/scenario/general/postgresql.xml")));
+ }
+ for (String each :
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
OpenGaussDatabaseType())) {
+ result.add(Arguments.of(new PipelineTestParameter(new
OpenGaussDatabaseType(), each, "env/scenario/general/postgresql.xml")));
+ }
+ return result.stream();
+ }
}
}
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index afb97fe244e..02939ab4967 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -17,7 +17,6 @@
package
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.general;
-import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
@@ -27,16 +26,17 @@ import
org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironme
import
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
import java.sql.Connection;
-import java.util.Collection;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.stream.Stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -46,62 +46,63 @@ import static org.hamcrest.Matchers.is;
* 1) no any rule.
* 2) only encrypt rule.
*/
-@RunWith(Parameterized.class)
-@Slf4j
public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
private static final String SOURCE_TABLE_NAME = "t_order";
private static final String TARGET_TABLE_NAME = "t_order";
- public RulesMigrationE2EIT(final PipelineTestParameter testParam) {
- super(testParam, new MigrationJobType());
- }
-
- @Parameters(name = "{0}")
- public static Collection<PipelineTestParameter> getTestParameters() {
- Collection<PipelineTestParameter> result = new LinkedList<>();
- if (PipelineE2EEnvironment.getInstance().getItEnvType() ==
PipelineEnvTypeEnum.NONE) {
- return result;
- }
- List<String> versions =
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
MySQLDatabaseType());
- if (versions.isEmpty()) {
- return result;
+ @ParameterizedTest(name = "{0}")
+ @EnabledIf("isEnabled")
+ @ArgumentsSource(TestCaseArgumentsProvider.class)
+ public void assertNoRuleMigrationSuccess(final PipelineTestParameter
testParam) throws Exception {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ assertMigrationSuccess(containerComposer, null);
}
- result.add(new PipelineTestParameter(new MySQLDatabaseType(),
versions.get(0), "env/scenario/primary_key/text_primary_key/mysql.xml"));
- return result;
}
- @Test
- public void assertNoRuleMigrationSuccess() throws Exception {
- assertMigrationSuccess(null);
- }
-
- @Test
- public void assertOnlyEncryptRuleMigrationSuccess() throws Exception {
- assertMigrationSuccess(() -> {
- createTargetOrderTableEncryptRule();
- return null;
- });
+ @ParameterizedTest(name = "{0}")
+ @EnabledIf("isEnabled")
+ @ArgumentsSource(TestCaseArgumentsProvider.class)
+ public void assertOnlyEncryptRuleMigrationSuccess(final
PipelineTestParameter testParam) throws Exception {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ assertMigrationSuccess(containerComposer, () -> {
+ createTargetOrderTableEncryptRule(containerComposer);
+ return null;
+ });
+ }
}
- private void assertMigrationSuccess(final Callable<Void> addRuleFn) throws
Exception {
- getContainerComposer().createSourceOrderTable(SOURCE_TABLE_NAME);
- try (Connection connection =
getContainerComposer().getSourceDataSource().getConnection()) {
+ private void assertMigrationSuccess(final PipelineContainerComposer
containerComposer, final Callable<Void> addRuleFn) throws Exception {
+ containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
+ try (Connection connection =
containerComposer.getSourceDataSource().getConnection()) {
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, new
UUIDKeyGenerateAlgorithm(), SOURCE_TABLE_NAME,
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
}
- addMigrationSourceResource();
- addMigrationTargetResource();
+ addMigrationSourceResource(containerComposer);
+ addMigrationTargetResource(containerComposer);
if (null != addRuleFn) {
addRuleFn.call();
}
- startMigration(SOURCE_TABLE_NAME, TARGET_TABLE_NAME);
- String jobId = listJobId().get(0);
- getContainerComposer().waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
- getContainerComposer().waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
- assertCheckMigrationSuccess(jobId, "DATA_MATCH");
- commitMigrationByJobId(jobId);
- getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA",
1);
-
assertThat(getContainerComposer().getTargetTableRecordsCount(SOURCE_TABLE_NAME),
is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
+ startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
+ String jobId = listJobId(containerComposer).get(0);
+ containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
+ containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
+ commitMigrationByJobId(containerComposer, jobId);
+ containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+
assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME),
is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
+ }
+
+ private static boolean isEnabled() {
+ return PipelineEnvTypeEnum.NONE !=
PipelineE2EEnvironment.getInstance().getItEnvType() &&
!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
MySQLDatabaseType()).isEmpty();
+ }
+
+ private static class TestCaseArgumentsProvider implements
ArgumentsProvider {
+
+ @Override
+ public Stream<? extends Arguments> provideArguments(final
ExtensionContext extensionContext) {
+ List<String> versions =
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
MySQLDatabaseType());
+ return Stream.of(Arguments.of(new PipelineTestParameter(new
MySQLDatabaseType(), versions.get(0),
"env/scenario/primary_key/text_primary_key/mysql.xml")));
+ }
}
}
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 8accc74d08f..7076afdc448 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -17,7 +17,6 @@
package
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Hex;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
@@ -31,10 +30,12 @@ import
org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironme
import
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -43,6 +44,7 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.stream.Stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -55,8 +57,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* 3) multiple columns primary key, first column type is VARCHAR.
* 4) multiple columns unique key, first column type is BIGINT.
*/
-@RunWith(Parameterized.class)
-@Slf4j
public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
private static final String ORDER_TABLE_SHARDING_RULE_FORMAT = "CREATE
SHARDING TABLE RULE t_order(\n"
@@ -69,52 +69,35 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
private static final String TARGET_TABLE_NAME = "t_order";
- public IndexesMigrationE2EIT(final PipelineTestParameter testParam) {
- super(testParam, new MigrationJobType());
- }
-
- @Parameters(name = "{0}")
- public static Collection<PipelineTestParameter> getTestParameters() {
- Collection<PipelineTestParameter> result = new LinkedList<>();
- if (PipelineE2EEnvironment.getInstance().getItEnvType() ==
PipelineEnvTypeEnum.NONE) {
- return result;
- }
- List<String> mysqlVersion =
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
MySQLDatabaseType());
- if (!mysqlVersion.isEmpty()) {
- result.add(new PipelineTestParameter(new MySQLDatabaseType(),
mysqlVersion.get(0), "env/common/none.xml"));
- }
- List<String> postgresqlVersion =
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
PostgreSQLDatabaseType());
- if (!postgresqlVersion.isEmpty()) {
- result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(),
postgresqlVersion.get(0), "env/common/none.xml"));
- }
- return result;
- }
-
- @Test
- public void assertNoUniqueKeyMigrationSuccess() throws Exception {
- String sql;
- String consistencyCheckAlgorithmType;
- if (getContainerComposer().getDatabaseType() instanceof
MySQLDatabaseType) {
- sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL,
`user_id` INT NOT NULL, `status` varchar(255)) ENGINE=InnoDB DEFAULT
CHARSET=utf8mb4";
- // DATA_MATCH doesn't supported, could not order by records
- consistencyCheckAlgorithmType = "CRC32_MATCH";
- } else if (getContainerComposer().getDatabaseType() instanceof
PostgreSQLDatabaseType) {
- sql = "CREATE TABLE %s (order_id varchar(255) NOT NULL,user_id int
NOT NULL,status varchar(255) NULL)";
- consistencyCheckAlgorithmType = null;
- } else {
- return;
+ @ParameterizedTest(name = "{0}")
+ @EnabledIf("isEnabled")
+ @ArgumentsSource(TestCaseArgumentsProvider.class)
+ public void assertNoUniqueKeyMigrationSuccess(final PipelineTestParameter
testParam) throws Exception {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ String sql;
+ String consistencyCheckAlgorithmType;
+ if (containerComposer.getDatabaseType() instanceof
MySQLDatabaseType) {
+ sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL,
`user_id` INT NOT NULL, `status` varchar(255)) ENGINE=InnoDB DEFAULT
CHARSET=utf8mb4";
+ // DATA_MATCH doesn't supported, could not order by records
+ consistencyCheckAlgorithmType = "CRC32_MATCH";
+ } else if (containerComposer.getDatabaseType() instanceof
PostgreSQLDatabaseType) {
+ sql = "CREATE TABLE %s (order_id varchar(255) NOT NULL,user_id
int NOT NULL,status varchar(255) NULL)";
+ consistencyCheckAlgorithmType = null;
+ } else {
+ return;
+ }
+ KeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
+ Object uniqueKey = keyGenerateAlgorithm.generateKey();
+ assertMigrationSuccess(containerComposer, sql, "user_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
+ insertOneOrder(containerComposer, uniqueKey);
+ containerComposer.assertProxyOrderRecordExist("t_order",
uniqueKey);
+ return null;
+ });
}
- KeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
- Object uniqueKey = keyGenerateAlgorithm.generateKey();
- assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm,
consistencyCheckAlgorithmType, () -> {
- insertOneOrder(uniqueKey);
- getContainerComposer().assertProxyOrderRecordExist("t_order",
uniqueKey);
- return null;
- });
}
- private void insertOneOrder(final Object uniqueKey) throws SQLException {
- try (PreparedStatement preparedStatement =
getContainerComposer().getSourceDataSource().getConnection().prepareStatement("INSERT
INTO t_order (order_id,user_id,status) VALUES (?,?,?)")) {
+ private void insertOneOrder(final PipelineContainerComposer
containerComposer, final Object uniqueKey) throws SQLException {
+ try (PreparedStatement preparedStatement =
containerComposer.getSourceDataSource().getConnection().prepareStatement("INSERT
INTO t_order (order_id,user_id,status) VALUES (?,?,?)")) {
preparedStatement.setObject(1, uniqueKey);
preparedStatement.setObject(2, 1);
preparedStatement.setObject(3, "OK");
@@ -123,88 +106,123 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
}
}
- @Test
- public void assertMultiPrimaryKeyMigrationSuccess() throws Exception {
- String sql;
- String consistencyCheckAlgorithmType;
- if (getContainerComposer().getDatabaseType() instanceof
MySQLDatabaseType) {
- sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL,
`user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY
(`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
- consistencyCheckAlgorithmType = "CRC32_MATCH";
- } else {
- return;
+ @ParameterizedTest(name = "{0}")
+ @EnabledIf("isEnabled")
+ @ArgumentsSource(TestCaseArgumentsProvider.class)
+ public void assertMultiPrimaryKeyMigrationSuccess(final
PipelineTestParameter testParam) throws Exception {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ String sql;
+ String consistencyCheckAlgorithmType;
+ if (containerComposer.getDatabaseType() instanceof
MySQLDatabaseType) {
+ sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL,
`user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY
(`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+ consistencyCheckAlgorithmType = "CRC32_MATCH";
+ } else {
+ return;
+ }
+ KeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
+ Object uniqueKey = keyGenerateAlgorithm.generateKey();
+ assertMigrationSuccess(containerComposer, sql, "user_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
+ insertOneOrder(containerComposer, uniqueKey);
+ containerComposer.assertProxyOrderRecordExist("t_order",
uniqueKey);
+ return null;
+ });
}
- KeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
- Object uniqueKey = keyGenerateAlgorithm.generateKey();
- assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm,
consistencyCheckAlgorithmType, () -> {
- insertOneOrder(uniqueKey);
- getContainerComposer().assertProxyOrderRecordExist("t_order",
uniqueKey);
- return null;
- });
}
- @Test
- public void assertMultiUniqueKeyMigrationSuccess() throws Exception {
- String sql;
- String consistencyCheckAlgorithmType;
- if (getContainerComposer().getDatabaseType() instanceof
MySQLDatabaseType) {
- sql = "CREATE TABLE `%s` (`order_id` BIGINT NOT NULL, `user_id`
INT NOT NULL, `status` varchar(255), UNIQUE KEY (`order_id`,`user_id`))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
- consistencyCheckAlgorithmType = "DATA_MATCH";
- } else {
- return;
+ @ParameterizedTest(name = "{0}")
+ @EnabledIf("isEnabled")
+ @ArgumentsSource(TestCaseArgumentsProvider.class)
+ public void assertMultiUniqueKeyMigrationSuccess(final
PipelineTestParameter testParam) throws Exception {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ String sql;
+ String consistencyCheckAlgorithmType;
+ if (containerComposer.getDatabaseType() instanceof
MySQLDatabaseType) {
+ sql = "CREATE TABLE `%s` (`order_id` BIGINT NOT NULL,
`user_id` INT NOT NULL, `status` varchar(255), UNIQUE KEY
(`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+ consistencyCheckAlgorithmType = "DATA_MATCH";
+ } else {
+ return;
+ }
+ KeyGenerateAlgorithm keyGenerateAlgorithm = new
SnowflakeKeyGenerateAlgorithm();
+ Object uniqueKey = keyGenerateAlgorithm.generateKey();
+ assertMigrationSuccess(containerComposer, sql, "user_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
+ insertOneOrder(containerComposer, uniqueKey);
+ containerComposer.assertProxyOrderRecordExist("t_order",
uniqueKey);
+ return null;
+ });
}
- KeyGenerateAlgorithm keyGenerateAlgorithm = new
SnowflakeKeyGenerateAlgorithm();
- Object uniqueKey = keyGenerateAlgorithm.generateKey();
- assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm,
consistencyCheckAlgorithmType, () -> {
- insertOneOrder(uniqueKey);
- getContainerComposer().assertProxyOrderRecordExist("t_order",
uniqueKey);
- return null;
- });
}
- @Test
- public void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess()
throws Exception {
- String sql;
- String consistencyCheckAlgorithmType;
- if (getContainerComposer().getDatabaseType() instanceof
MySQLDatabaseType) {
- sql = "CREATE TABLE `%s` (`order_id` VARBINARY(64) NOT NULL,
`user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
- // DATA_MATCH doesn't supported: Order by value must implements
Comparable
- consistencyCheckAlgorithmType = "CRC32_MATCH";
- } else {
- return;
+ @ParameterizedTest(name = "{0}")
+ @EnabledIf("isEnabled")
+ @ArgumentsSource(TestCaseArgumentsProvider.class)
+ public void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess(final
PipelineTestParameter testParam) throws Exception {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ String sql;
+ String consistencyCheckAlgorithmType;
+ if (containerComposer.getDatabaseType() instanceof
MySQLDatabaseType) {
+ sql = "CREATE TABLE `%s` (`order_id` VARBINARY(64) NOT NULL,
`user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+ // DATA_MATCH doesn't supported: Order by value must
implements Comparable
+ consistencyCheckAlgorithmType = "CRC32_MATCH";
+ } else {
+ return;
+ }
+ KeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
+ // TODO Insert binary string in VARBINARY column. But
KeyGenerateAlgorithm.generateKey() require returning Comparable, and byte[] is
not Comparable
+ byte[] uniqueKey = new byte[]{-1, 0, 1};
+ assertMigrationSuccess(containerComposer, sql, "order_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
+ insertOneOrder(containerComposer, uniqueKey);
+ // TODO Select by byte[] from proxy doesn't work, so unhex
function is used for now
+
containerComposer.assertProxyOrderRecordExist(String.format("SELECT 1 FROM
t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
+ return null;
+ });
}
- KeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
- // TODO Insert binary string in VARBINARY column. But
KeyGenerateAlgorithm.generateKey() require returning Comparable, and byte[] is
not Comparable
- byte[] uniqueKey = new byte[]{-1, 0, 1};
- assertMigrationSuccess(sql, "order_id", keyGenerateAlgorithm,
consistencyCheckAlgorithmType, () -> {
- insertOneOrder(uniqueKey);
- // TODO Select by byte[] from proxy doesn't work, so unhex
function is used for now
-
getContainerComposer().assertProxyOrderRecordExist(String.format("SELECT 1 FROM
t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
- return null;
- });
}
- private void assertMigrationSuccess(final String sqlPattern, final String
shardingColumn, final KeyGenerateAlgorithm keyGenerateAlgorithm,
+ private void assertMigrationSuccess(final PipelineContainerComposer
containerComposer, final String sqlPattern, final String shardingColumn, final
KeyGenerateAlgorithm keyGenerateAlgorithm,
final String
consistencyCheckAlgorithmType, final Callable<Void> incrementalTaskFn) throws
Exception {
- getContainerComposer().sourceExecuteWithLog(String.format(sqlPattern,
SOURCE_TABLE_NAME));
- try (Connection connection =
getContainerComposer().getSourceDataSource().getConnection()) {
+ containerComposer.sourceExecuteWithLog(String.format(sqlPattern,
SOURCE_TABLE_NAME));
+ try (Connection connection =
containerComposer.getSourceDataSource().getConnection()) {
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
keyGenerateAlgorithm, SOURCE_TABLE_NAME,
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
}
- addMigrationProcessConfig();
- addMigrationSourceResource();
- addMigrationTargetResource();
-
getContainerComposer().proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT,
shardingColumn), 2);
- startMigration(SOURCE_TABLE_NAME, TARGET_TABLE_NAME);
- String jobId = listJobId().get(0);
- getContainerComposer().waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ addMigrationProcessConfig(containerComposer);
+ addMigrationSourceResource(containerComposer);
+ addMigrationTargetResource(containerComposer);
+
containerComposer.proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT,
shardingColumn), 2);
+ startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
+ String jobId = listJobId(containerComposer).get(0);
+ containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
incrementalTaskFn.call();
- getContainerComposer().waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
if (null != consistencyCheckAlgorithmType) {
- assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
+ assertCheckMigrationSuccess(containerComposer, jobId,
consistencyCheckAlgorithmType);
}
- commitMigrationByJobId(jobId);
- getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA",
1);
-
assertThat(getContainerComposer().getTargetTableRecordsCount(SOURCE_TABLE_NAME),
is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
- List<String> lastJobIds = listJobId();
+ commitMigrationByJobId(containerComposer, jobId);
+ containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+
assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME),
is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
+ List<String> lastJobIds = listJobId(containerComposer);
assertTrue(lastJobIds.isEmpty());
}
+
+ private static boolean isEnabled() {
+ return PipelineEnvTypeEnum.NONE !=
PipelineE2EEnvironment.getInstance().getItEnvType()
+ &&
(!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
MySQLDatabaseType()).isEmpty()
+ ||
!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
PostgreSQLDatabaseType()).isEmpty());
+ }
+
+ private static class TestCaseArgumentsProvider implements
ArgumentsProvider {
+
+ @Override
+ public Stream<? extends Arguments> provideArguments(final
ExtensionContext extensionContext) {
+ Collection<Arguments> result = new LinkedList<>();
+ List<String> mysqlVersion =
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
MySQLDatabaseType());
+ if (!mysqlVersion.isEmpty()) {
+ result.add(Arguments.of(new PipelineTestParameter(new
MySQLDatabaseType(), mysqlVersion.get(0), "env/common/none.xml")));
+ }
+ List<String> postgresqlVersion =
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
PostgreSQLDatabaseType());
+ if (!postgresqlVersion.isEmpty()) {
+ result.add(Arguments.of(new PipelineTestParameter(new
PostgreSQLDatabaseType(), postgresqlVersion.get(0), "env/common/none.xml")));
+ }
+ return result.stream();
+ }
+ }
}
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index 2ed54d39654..35bf66cd1c6 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -28,22 +28,22 @@ import
org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironme
import
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.Collection;
-import java.util.LinkedList;
import java.util.List;
+import java.util.stream.Stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
-@RunWith(Parameterized.class)
@Slf4j
public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
@@ -51,48 +51,47 @@ public final class MariaDBMigrationE2EIT extends
AbstractMigrationE2EIT {
private static final String TARGET_TABLE_NAME = "t_order";
- public MariaDBMigrationE2EIT(final PipelineTestParameter testParam) {
- super(testParam, new MigrationJobType());
+ @ParameterizedTest(name = "{0}")
+ @EnabledIf("isEnabled")
+ @ArgumentsSource(TestCaseArgumentsProvider.class)
+ public void assertMigrationSuccess(final PipelineTestParameter testParam)
throws SQLException, InterruptedException {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT
NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+ containerComposer.sourceExecuteWithLog(String.format(sqlPattern,
SOURCE_TABLE_NAME));
+ try (Connection connection =
containerComposer.getSourceDataSource().getConnection()) {
+ KeyGenerateAlgorithm generateAlgorithm = new
UUIDKeyGenerateAlgorithm();
+
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
generateAlgorithm, SOURCE_TABLE_NAME,
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+ }
+ addMigrationProcessConfig(containerComposer);
+ addMigrationSourceResource(containerComposer);
+ addMigrationTargetResource(containerComposer);
+ createTargetOrderTableRule(containerComposer);
+ startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
+ String jobId = listJobId(containerComposer).get(0);
+ containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ containerComposer.sourceExecuteWithLog("INSERT INTO t_order
(order_id, user_id, status) VALUES ('a1', 1, 'OK')");
+ containerComposer.assertProxyOrderRecordExist("t_order", "a1");
+ containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ assertCheckMigrationSuccess(containerComposer, jobId,
"CRC32_MATCH");
+ commitMigrationByJobId(containerComposer, jobId);
+ containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+
assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME),
is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
+ List<String> lastJobIds = listJobId(containerComposer);
+ assertTrue(lastJobIds.isEmpty());
+ }
}
- @Parameters(name = "{0}")
- public static Collection<PipelineTestParameter> getTestParameters() {
- Collection<PipelineTestParameter> result = new LinkedList<>();
- if (PipelineE2EEnvironment.getInstance().getItEnvType() ==
PipelineEnvTypeEnum.NONE) {
- return result;
- }
- List<String> versions =
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
MySQLDatabaseType());
- if (versions.isEmpty()) {
- return result;
- }
- // TODO use MariaDBDatabaseType
- result.add(new PipelineTestParameter(new MySQLDatabaseType(),
versions.get(0), "env/common/none.xml"));
- return result;
+ private static boolean isEnabled() {
+ return PipelineEnvTypeEnum.NONE !=
PipelineE2EEnvironment.getInstance().getItEnvType() &&
!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
MySQLDatabaseType()).isEmpty();
}
- @Test
- public void assertMigrationSuccess() throws SQLException,
InterruptedException {
- String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT
NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
- getContainerComposer().sourceExecuteWithLog(String.format(sqlPattern,
SOURCE_TABLE_NAME));
- try (Connection connection =
getContainerComposer().getSourceDataSource().getConnection()) {
- KeyGenerateAlgorithm generateAlgorithm = new
UUIDKeyGenerateAlgorithm();
-
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
generateAlgorithm, SOURCE_TABLE_NAME,
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+ private static class TestCaseArgumentsProvider implements
ArgumentsProvider {
+
+ @Override
+ public Stream<? extends Arguments> provideArguments(final
ExtensionContext extensionContext) {
+ List<String> versions =
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
MySQLDatabaseType());
+ // TODO use MariaDBDatabaseType
+ return Stream.of(Arguments.of(new PipelineTestParameter(new
MySQLDatabaseType(), versions.get(0), "env/common/none.xml")));
}
- addMigrationProcessConfig();
- addMigrationSourceResource();
- addMigrationTargetResource();
- createTargetOrderTableRule();
- startMigration(SOURCE_TABLE_NAME, TARGET_TABLE_NAME);
- String jobId = listJobId().get(0);
- getContainerComposer().waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
- getContainerComposer().sourceExecuteWithLog("INSERT INTO t_order
(order_id, user_id, status) VALUES ('a1', 1, 'OK')");
- getContainerComposer().assertProxyOrderRecordExist("t_order", "a1");
- getContainerComposer().waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
- assertCheckMigrationSuccess(jobId, "CRC32_MATCH");
- commitMigrationByJobId(jobId);
- getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA",
1);
-
assertThat(getContainerComposer().getTargetTableRecordsCount(SOURCE_TABLE_NAME),
is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
- List<String> lastJobIds = listJobId();
- assertTrue(lastJobIds.isEmpty());
}
}
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index 16561dfdf53..6030c172822 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -17,7 +17,6 @@
package
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
-import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
@@ -30,69 +29,78 @@ import
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTyp
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
import
org.apache.shardingsphere.test.e2e.env.container.atomic.util.DatabaseTypeUtil;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertTrue;
-@RunWith(Parameterized.class)
-@Slf4j
public class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
private static final String TARGET_TABLE_NAME = "t_order";
- public TextPrimaryKeyMigrationE2EIT(final PipelineTestParameter testParam)
{
- super(testParam, new MigrationJobType());
+ @ParameterizedTest(name = "{0}")
+ @EnabledIf("isEnabled")
+ @ArgumentsSource(TestCaseArgumentsProvider.class)
+ public void assertTextPrimaryMigrationSuccess(final PipelineTestParameter
testParam) throws SQLException, InterruptedException {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+
containerComposer.createSourceOrderTable(getSourceTableName(containerComposer));
+ try (Connection connection =
containerComposer.getSourceDataSource().getConnection()) {
+ UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
+
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
keyGenerateAlgorithm, getSourceTableName(containerComposer),
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+ }
+ addMigrationProcessConfig(containerComposer);
+ addMigrationSourceResource(containerComposer);
+ addMigrationTargetResource(containerComposer);
+ createTargetOrderTableRule(containerComposer);
+ startMigration(containerComposer,
getSourceTableName(containerComposer), TARGET_TABLE_NAME);
+ String jobId = listJobId(containerComposer).get(0);
+ containerComposer.sourceExecuteWithLog(
+ String.format("INSERT INTO %s (order_id,user_id,status)
VALUES (%s, %s, '%s')", getSourceTableName(containerComposer), "1000000000", 1,
"afterStop"));
+ containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ assertCheckMigrationSuccess(containerComposer, jobId,
"DATA_MATCH");
+ commitMigrationByJobId(containerComposer, jobId);
+ List<String> lastJobIds = listJobId(containerComposer);
+ assertTrue(lastJobIds.isEmpty());
+ }
}
- @Parameters(name = "{0}")
- public static Collection<PipelineTestParameter> getTestParameters() {
- Collection<PipelineTestParameter> result = new LinkedList<>();
- if (PipelineE2EEnvironment.getInstance().getItEnvType() ==
PipelineEnvTypeEnum.NONE) {
- return result;
- }
- for (String version :
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
MySQLDatabaseType())) {
- result.add(new PipelineTestParameter(new MySQLDatabaseType(),
version, "env/scenario/primary_key/text_primary_key/mysql.xml"));
- }
- for (String version :
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
PostgreSQLDatabaseType())) {
- result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(),
version, "env/scenario/primary_key/text_primary_key/postgresql.xml"));
- }
- for (String version :
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
OpenGaussDatabaseType())) {
- result.add(new PipelineTestParameter(new OpenGaussDatabaseType(),
version, "env/scenario/primary_key/text_primary_key/postgresql.xml"));
- }
- return result;
+ private String getSourceTableName(final PipelineContainerComposer
containerComposer) {
+ return DatabaseTypeUtil.isMySQL(containerComposer.getDatabaseType()) ?
"T_ORDER" : "t_order";
}
- @Test
- public void assertTextPrimaryMigrationSuccess() throws SQLException,
InterruptedException {
- getContainerComposer().createSourceOrderTable(getSourceTableName());
- try (Connection connection =
getContainerComposer().getSourceDataSource().getConnection()) {
- UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
-
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
keyGenerateAlgorithm, getSourceTableName(),
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
- }
- addMigrationProcessConfig();
- addMigrationSourceResource();
- addMigrationTargetResource();
- createTargetOrderTableRule();
- startMigration(getSourceTableName(), TARGET_TABLE_NAME);
- String jobId = listJobId().get(0);
- getContainerComposer().sourceExecuteWithLog(String.format("INSERT INTO
%s (order_id,user_id,status) VALUES (%s, %s, '%s')", getSourceTableName(),
"1000000000", 1, "afterStop"));
- getContainerComposer().waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
- assertCheckMigrationSuccess(jobId, "DATA_MATCH");
- commitMigrationByJobId(jobId);
- List<String> lastJobIds = listJobId();
- assertTrue(lastJobIds.isEmpty());
+ private static boolean isEnabled() {
+ return PipelineEnvTypeEnum.NONE !=
PipelineE2EEnvironment.getInstance().getItEnvType()
+ &&
(!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
MySQLDatabaseType()).isEmpty()
+ ||
!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
PostgreSQLDatabaseType()).isEmpty()
+ ||
!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
OpenGaussDatabaseType()).isEmpty());
}
- private String getSourceTableName() {
- return
DatabaseTypeUtil.isMySQL(getContainerComposer().getDatabaseType()) ? "T_ORDER"
: "t_order";
+ private static class TestCaseArgumentsProvider implements
ArgumentsProvider {
+
+ @Override
+ public Stream<? extends Arguments> provideArguments(final
ExtensionContext extensionContext) {
+ Collection<Arguments> result = new LinkedList<>();
+ for (String version :
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
MySQLDatabaseType())) {
+ result.add(Arguments.of(new PipelineTestParameter(new
MySQLDatabaseType(), version,
"env/scenario/primary_key/text_primary_key/mysql.xml")));
+ }
+ for (String version :
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
PostgreSQLDatabaseType())) {
+ result.add(Arguments.of(new PipelineTestParameter(new
PostgreSQLDatabaseType(), version,
"env/scenario/primary_key/text_primary_key/postgresql.xml")));
+ }
+ for (String version :
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new
OpenGaussDatabaseType())) {
+ result.add(Arguments.of(new PipelineTestParameter(new
OpenGaussDatabaseType(), version,
"env/scenario/primary_key/text_primary_key/postgresql.xml")));
+ }
+ return result.stream();
+ }
}
}