This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b6b3ce5e7ca Upgrade junit 5 on MigrationE2EIT (#24615)
b6b3ce5e7ca is described below

commit b6b3ce5e7ca72f2e890ea17773c0b7b218467021
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Mar 15 14:56:28 2023 +0800

    Upgrade junit 5 on MigrationE2EIT (#24615)
    
    * Upgrade junit 5 on CDCE2EIT
    
    * Upgrade junit 5 on MigrationE2EIT
    
    * Fix checkstyle
---
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 117 +++++-----
 .../cases/migration/AbstractMigrationE2EIT.java    |  43 ++--
 .../general/MySQLMigrationGeneralE2EIT.java        | 118 +++++-----
 .../general/PostgreSQLMigrationGeneralE2EIT.java   | 148 ++++++------
 .../migration/general/RulesMigrationE2EIT.java     |  97 ++++----
 .../primarykey/IndexesMigrationE2EIT.java          | 252 +++++++++++----------
 .../primarykey/MariaDBMigrationE2EIT.java          |  89 ++++----
 .../primarykey/TextPrimaryKeyMigrationE2EIT.java   | 100 ++++----
 8 files changed, 492 insertions(+), 472 deletions(-)

diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 7d11825653a..c375ef0fb1b 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -48,7 +48,6 @@ import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.Pipeline
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
 import 
org.apache.shardingsphere.test.e2e.env.container.atomic.constants.ProxyContainerConstants;
 import 
org.apache.shardingsphere.test.e2e.env.container.atomic.util.StorageContainerUtil;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -93,76 +92,68 @@ public final class CDCE2EIT {
     
     private static final String SOURCE_TABLE_NAME = "t_order";
     
-    private final PipelineContainerComposer containerComposer;
-    
     private final ExecutorService executor = 
Executors.newSingleThreadExecutor();
     
-    public CDCE2EIT(final PipelineTestParameter testParam) {
-        containerComposer = new PipelineContainerComposer(testParam, new 
CDCJobType());
-    }
-    
-    @AfterEach
-    public void tearDown() {
-        containerComposer.close();
-    }
-    
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
     @ArgumentsSource(TestCaseArgumentsProvider.class)
-    public void assertCDCDataImportSuccess() throws SQLException, 
InterruptedException {
-        // make sure the program time zone same with the database server at CI.
-        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
-        for (String each : Arrays.asList(PipelineContainerComposer.DS_0, 
PipelineContainerComposer.DS_1)) {
-            containerComposer.registerStorageUnit(each);
-        }
-        createOrderTableRule();
-        try (Connection connection = 
containerComposer.getProxyDataSource().getConnection()) {
-            initSchemaAndTable(connection, 2);
-        }
-        DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
-        Pair<List<Object[]>, List<Object[]>> dataPair = 
PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), 
20);
-        log.info("init data begin: {}", LocalDateTime.now());
-        DataSourceExecuteUtil.execute(jdbcDataSource, 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
dataPair.getLeft());
-        log.info("init data end: {}", LocalDateTime.now());
-        try (
-                Connection connection = 
DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
 false),
-                        containerComposer.getUsername(), 
containerComposer.getPassword())) {
-            initSchemaAndTable(connection, 0);
-        }
-        startCDCClient();
-        Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
STREAMING LIST").isEmpty());
-        String jobId = containerComposer.queryForListWithLog("SHOW STREAMING 
LIST").get(0).get("id").toString();
-        containerComposer.waitIncrementTaskFinished(String.format("SHOW 
STREAMING STATUS '%s'", jobId));
-        containerComposer.startIncrementTask(new 
E2EIncrementalTask(jdbcDataSource, SOURCE_TABLE_NAME, new 
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20));
-        containerComposer.getIncreaseTaskThread().join(10000L);
-        List<Map<String, Object>> actualProxyList;
-        try (Connection connection = jdbcDataSource.getConnection()) {
-            ResultSet resultSet = 
connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER 
BY order_id ASC", getOrderTableNameWithSchema()));
-            actualProxyList = 
containerComposer.transformResultSetToList(resultSet);
+    public void assertCDCDataImportSuccess(final PipelineTestParameter 
testParam) throws SQLException, InterruptedException {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new CDCJobType())) {
+            // make sure the program time zone same with the database server 
at CI.
+            TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+            for (String each : Arrays.asList(PipelineContainerComposer.DS_0, 
PipelineContainerComposer.DS_1)) {
+                containerComposer.registerStorageUnit(each);
+            }
+            createOrderTableRule(containerComposer);
+            try (Connection connection = 
containerComposer.getProxyDataSource().getConnection()) {
+                initSchemaAndTable(containerComposer, connection, 2);
+            }
+            DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
+            Pair<List<Object[]>, List<Object[]>> dataPair = 
PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), 
20);
+            log.info("init data begin: {}", LocalDateTime.now());
+            DataSourceExecuteUtil.execute(jdbcDataSource, 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
dataPair.getLeft());
+            log.info("init data end: {}", LocalDateTime.now());
+            try (
+                    Connection connection = 
DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
 false),
+                            containerComposer.getUsername(), 
containerComposer.getPassword())) {
+                initSchemaAndTable(containerComposer, connection, 0);
+            }
+            startCDCClient(containerComposer);
+            Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
STREAMING LIST").isEmpty());
+            String jobId = containerComposer.queryForListWithLog("SHOW 
STREAMING LIST").get(0).get("id").toString();
+            containerComposer.waitIncrementTaskFinished(String.format("SHOW 
STREAMING STATUS '%s'", jobId));
+            containerComposer.startIncrementTask(new 
E2EIncrementalTask(jdbcDataSource, SOURCE_TABLE_NAME, new 
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20));
+            containerComposer.getIncreaseTaskThread().join(10000L);
+            List<Map<String, Object>> actualProxyList;
+            try (Connection connection = jdbcDataSource.getConnection()) {
+                ResultSet resultSet = 
connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER 
BY order_id ASC", getOrderTableNameWithSchema(containerComposer)));
+                actualProxyList = 
containerComposer.transformResultSetToList(resultSet);
+            }
+            Awaitility.await().atMost(20, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS)
+                    .until(() -> listOrderRecords(containerComposer, 
getOrderTableNameWithSchema(containerComposer)).size() == 
actualProxyList.size());
+            SchemaTableName schemaTableName = 
containerComposer.getDatabaseType().isSchemaAvailable()
+                    ? new SchemaTableName(new 
SchemaName(PipelineContainerComposer.SCHEMA_NAME), new 
TableName(SOURCE_TABLE_NAME))
+                    : new SchemaTableName(new SchemaName(null), new 
TableName(SOURCE_TABLE_NAME));
+            PipelineDataSourceWrapper targetDataSource = new 
PipelineDataSourceWrapper(StorageContainerUtil.generateDataSource(
+                    
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, 
false),
+                    containerComposer.getUsername(), 
containerComposer.getPassword()), containerComposer.getDatabaseType());
+            PipelineDataSourceWrapper sourceDataSource = new 
PipelineDataSourceWrapper(jdbcDataSource, containerComposer.getDatabaseType());
+            StandardPipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(targetDataSource);
+            PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(PipelineContainerComposer.SCHEMA_NAME, 
"t_order");
+            PipelineColumnMetaData primaryKeyMetaData = 
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
+            ConsistencyCheckJobItemProgressContext progressContext = new 
ConsistencyCheckJobItemProgressContext("", 0);
+            SingleTableInventoryDataConsistencyChecker checker = new 
SingleTableInventoryDataConsistencyChecker("", sourceDataSource, 
targetDataSource, schemaTableName, schemaTableName,
+                    tableMetaData.getColumnNames(), primaryKeyMetaData, null, 
progressContext);
+            DataConsistencyCheckResult checkResult = checker.check(new 
DataMatchDataConsistencyCalculateAlgorithm());
+            assertTrue(checkResult.isMatched());
         }
-        Awaitility.await().atMost(20, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> 
listOrderRecords(getOrderTableNameWithSchema()).size() == 
actualProxyList.size());
-        SchemaTableName schemaTableName = 
containerComposer.getDatabaseType().isSchemaAvailable()
-                ? new SchemaTableName(new 
SchemaName(PipelineContainerComposer.SCHEMA_NAME), new 
TableName(SOURCE_TABLE_NAME))
-                : new SchemaTableName(new SchemaName(null), new 
TableName(SOURCE_TABLE_NAME));
-        PipelineDataSourceWrapper targetDataSource = new 
PipelineDataSourceWrapper(StorageContainerUtil.generateDataSource(
-                
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, 
false),
-                containerComposer.getUsername(), 
containerComposer.getPassword()), containerComposer.getDatabaseType());
-        PipelineDataSourceWrapper sourceDataSource = new 
PipelineDataSourceWrapper(jdbcDataSource, containerComposer.getDatabaseType());
-        StandardPipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(targetDataSource);
-        PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(PipelineContainerComposer.SCHEMA_NAME, 
"t_order");
-        PipelineColumnMetaData primaryKeyMetaData = 
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
-        ConsistencyCheckJobItemProgressContext progressContext = new 
ConsistencyCheckJobItemProgressContext("", 0);
-        SingleTableInventoryDataConsistencyChecker checker = new 
SingleTableInventoryDataConsistencyChecker("", sourceDataSource, 
targetDataSource, schemaTableName, schemaTableName,
-                tableMetaData.getColumnNames(), primaryKeyMetaData, null, 
progressContext);
-        DataConsistencyCheckResult checkResult = checker.check(new 
DataMatchDataConsistencyCalculateAlgorithm());
-        assertTrue(checkResult.isMatched());
     }
     
-    private void createOrderTableRule() throws SQLException {
+    private void createOrderTableRule(final PipelineContainerComposer 
containerComposer) throws SQLException {
         containerComposer.proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 2);
     }
     
-    private void initSchemaAndTable(final Connection connection, final int 
sleepSeconds) throws SQLException {
+    private void initSchemaAndTable(final PipelineContainerComposer 
containerComposer, final Connection connection, final int sleepSeconds) throws 
SQLException {
         containerComposer.createSchema(connection, sleepSeconds);
         String sql = 
containerComposer.getExtraSQLCommand().getCreateTableOrder(SOURCE_TABLE_NAME);
         log.info("create table sql: {}", sql);
@@ -172,7 +163,7 @@ public final class CDCE2EIT {
         }
     }
     
-    private void startCDCClient() {
+    private void startCDCClient(final PipelineContainerComposer 
containerComposer) {
         ImportDataSourceParameter importDataSourceParam = new 
ImportDataSourceParameter(containerComposer.appendExtraParameter(
                 
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, 
false, 0)), containerComposer.getUsername(), containerComposer.getPassword());
         StartCDCClientParameter parameter = new 
StartCDCClientParameter(importDataSourceParam);
@@ -193,7 +184,7 @@ public final class CDCE2EIT {
         });
     }
     
-    private List<Map<String, Object>> listOrderRecords(final String 
tableNameWithSchema) throws SQLException {
+    private List<Map<String, Object>> listOrderRecords(final 
PipelineContainerComposer containerComposer, final String tableNameWithSchema) 
throws SQLException {
         try (
                 Connection connection = DriverManager.getConnection(
                         
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, 
false), containerComposer.getUsername(), containerComposer.getPassword())) {
@@ -202,7 +193,7 @@ public final class CDCE2EIT {
         }
     }
     
-    private String getOrderTableNameWithSchema() {
+    private String getOrderTableNameWithSchema(final PipelineContainerComposer 
containerComposer) {
         return containerComposer.getDatabaseType().isSchemaAvailable() ? 
String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) : 
SOURCE_TABLE_NAME;
     }
     
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 09528985b38..cdab5a8d7b8 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -21,14 +21,11 @@ import com.google.common.base.Strings;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
-import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.command.MigrationDistSQLCommand;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
-import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
 import 
org.apache.shardingsphere.test.e2e.env.container.atomic.util.DatabaseTypeUtil;
-import org.junit.After;
 import org.opengauss.util.PSQLException;
 
 import javax.xml.bind.JAXB;
@@ -49,21 +46,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Slf4j
 public abstract class AbstractMigrationE2EIT {
     
-    private final PipelineContainerComposer containerComposer;
-    
     private final MigrationDistSQLCommand migrationDistSQL;
     
-    public AbstractMigrationE2EIT(final PipelineTestParameter testParam, final 
JobType jobType) {
-        containerComposer = new PipelineContainerComposer(testParam, jobType);
+    public AbstractMigrationE2EIT() {
         migrationDistSQL = 
JAXB.unmarshal(Objects.requireNonNull(AbstractMigrationE2EIT.class.getClassLoader().getResource("env/common/migration-command.xml")),
 MigrationDistSQLCommand.class);
     }
     
-    @After
-    public final void tearDown() {
-        containerComposer.close();
-    }
-    
-    protected void addMigrationSourceResource() throws SQLException {
+    protected void addMigrationSourceResource(final PipelineContainerComposer 
containerComposer) throws SQLException {
         if (PipelineEnvTypeEnum.NATIVE == 
PipelineE2EEnvironment.getInstance().getItEnvType()) {
             try {
                 containerComposer.proxyExecuteWithLog("UNREGISTER MIGRATION 
SOURCE STORAGE UNIT ds_0", 2);
@@ -77,7 +66,7 @@ public abstract class AbstractMigrationE2EIT {
         containerComposer.addResource(addSourceResource);
     }
     
-    protected void addMigrationTargetResource() throws SQLException {
+    protected void addMigrationTargetResource(final PipelineContainerComposer 
containerComposer) throws SQLException {
         String addTargetResource = 
migrationDistSQL.getRegisterMigrationTargetStorageUnitTemplate().replace("${user}",
 containerComposer.getUsername())
                 .replace("${password}", containerComposer.getPassword())
                 .replace("${ds2}", 
containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_2,
 true)))
@@ -88,7 +77,7 @@ public abstract class AbstractMigrationE2EIT {
         assertThat(resources.size(), is(3));
     }
     
-    protected void createSourceSchema(final String schemaName) throws 
SQLException {
+    protected void createSourceSchema(final PipelineContainerComposer 
containerComposer, final String schemaName) throws SQLException {
         if 
(DatabaseTypeUtil.isPostgreSQL(containerComposer.getDatabaseType())) {
             containerComposer.sourceExecuteWithLog(String.format("CREATE 
SCHEMA IF NOT EXISTS %s", schemaName));
             return;
@@ -107,53 +96,53 @@ public abstract class AbstractMigrationE2EIT {
         }
     }
     
-    protected void createTargetOrderTableRule() throws SQLException {
+    protected void createTargetOrderTableRule(final PipelineContainerComposer 
containerComposer) throws SQLException {
         
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(),
 2);
     }
     
-    protected void createTargetOrderTableEncryptRule() throws SQLException {
+    protected void createTargetOrderTableEncryptRule(final 
PipelineContainerComposer containerComposer) throws SQLException {
         
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableEncryptRule(),
 2);
     }
     
-    protected void createTargetOrderItemTableRule() throws SQLException {
+    protected void createTargetOrderItemTableRule(final 
PipelineContainerComposer containerComposer) throws SQLException {
         
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderItemTableRule(),
 2);
     }
     
-    protected void startMigration(final String sourceTableName, final String 
targetTableName) throws SQLException {
+    protected void startMigration(final PipelineContainerComposer 
containerComposer, final String sourceTableName, final String targetTableName) 
throws SQLException {
         
containerComposer.proxyExecuteWithLog(migrationDistSQL.getMigrationSingleTable(sourceTableName,
 targetTableName), 5);
     }
     
-    protected void startMigrationWithSchema(final String sourceTableName, 
final String targetTableName) throws SQLException {
+    protected void startMigrationWithSchema(final PipelineContainerComposer 
containerComposer, final String sourceTableName, final String targetTableName) 
throws SQLException {
         
containerComposer.proxyExecuteWithLog(migrationDistSQL.getMigrationSingleTableWithSchema(sourceTableName,
 targetTableName), 5);
     }
     
-    protected void addMigrationProcessConfig() throws SQLException {
+    protected void addMigrationProcessConfig(final PipelineContainerComposer 
containerComposer) throws SQLException {
         
containerComposer.proxyExecuteWithLog(migrationDistSQL.getAlterMigrationRule(), 
0);
     }
     
-    protected void stopMigrationByJobId(final String jobId) throws 
SQLException {
+    protected void stopMigrationByJobId(final PipelineContainerComposer 
containerComposer, final String jobId) throws SQLException {
         containerComposer.proxyExecuteWithLog(String.format("STOP MIGRATION 
'%s'", jobId), 1);
     }
     
-    protected void startMigrationByJobId(final String jobId) throws 
SQLException {
+    protected void startMigrationByJobId(final PipelineContainerComposer 
containerComposer, final String jobId) throws SQLException {
         containerComposer.proxyExecuteWithLog(String.format("START MIGRATION 
'%s'", jobId), 1);
     }
     
-    protected void commitMigrationByJobId(final String jobId) throws 
SQLException {
+    protected void commitMigrationByJobId(final PipelineContainerComposer 
containerComposer, final String jobId) throws SQLException {
         containerComposer.proxyExecuteWithLog(String.format("COMMIT MIGRATION 
'%s'", jobId), 1);
     }
     
-    protected List<String> listJobId() {
+    protected List<String> listJobId(final PipelineContainerComposer 
containerComposer) {
         List<Map<String, Object>> jobList = 
containerComposer.queryForListWithLog("SHOW MIGRATION LIST");
         return jobList.stream().map(a -> 
a.get("id").toString()).collect(Collectors.toList());
     }
     
-    protected String getJobIdByTableName(final String tableName) {
+    protected String getJobIdByTableName(final PipelineContainerComposer 
containerComposer, final String tableName) {
         List<Map<String, Object>> jobList = 
containerComposer.queryForListWithLog("SHOW MIGRATION LIST");
         return jobList.stream().filter(a -> 
a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new 
RuntimeException("not find " + tableName + " table")).get("id").toString();
     }
     
-    protected void assertCheckMigrationSuccess(final String jobId, final 
String algorithmType) throws SQLException {
+    protected void assertCheckMigrationSuccess(final PipelineContainerComposer 
containerComposer, final String jobId, final String algorithmType) throws 
SQLException {
         containerComposer.proxyExecuteWithLog(String.format("CHECK MIGRATION 
'%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
         // TODO Need to add after the stop then to start, can continue the 
consistency check from the previous progress
         List<Map<String, Object>> resultList = Collections.emptyList();
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 7097a152d65..61d9663d5f3 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -31,10 +31,12 @@ import 
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTyp
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
 
 import java.sql.SQLException;
 import java.time.LocalDateTime;
@@ -43,12 +45,12 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@RunWith(Parameterized.class)
 @Slf4j
 public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
     
@@ -56,64 +58,68 @@ public final class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
     
     private static final String TARGET_TABLE_NAME = "t_order";
     
-    public MySQLMigrationGeneralE2EIT(final PipelineTestParameter testParam) {
-        super(testParam, new MigrationJobType());
+    @ParameterizedTest(name = "{0}")
+    @EnabledIf("isEnabled")
+    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    public void assertMigrationSuccess(final PipelineTestParameter testParam) 
throws SQLException, InterruptedException {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+            addMigrationProcessConfig(containerComposer);
+            containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
+            containerComposer.createSourceOrderItemTable();
+            addMigrationSourceResource(containerComposer);
+            addMigrationTargetResource(containerComposer);
+            createTargetOrderTableRule(containerComposer);
+            createTargetOrderTableEncryptRule(containerComposer);
+            createTargetOrderItemTableRule(containerComposer);
+            Pair<List<Object[]>, List<Object[]>> dataPair = 
PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+            log.info("init data begin: {}", LocalDateTime.now());
+            
DataSourceExecuteUtil.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
dataPair.getLeft());
+            
DataSourceExecuteUtil.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
+            log.info("init data end: {}", LocalDateTime.now());
+            startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
+            startMigration(containerComposer, "t_order_item", "t_order_item");
+            String orderJobId = getJobIdByTableName(containerComposer, "ds_0." 
+ SOURCE_TABLE_NAME);
+            containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", orderJobId));
+            containerComposer.startIncrementTask(
+                    new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, 
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
+            assertMigrationSuccessById(containerComposer, orderJobId, 
"DATA_MATCH");
+            String orderItemJobId = getJobIdByTableName(containerComposer, 
"ds_0.t_order_item");
+            assertMigrationSuccessById(containerComposer, orderItemJobId, 
"DATA_MATCH");
+            ThreadUtil.sleep(2, TimeUnit.SECONDS);
+            assertMigrationSuccessById(containerComposer, orderItemJobId, 
"CRC32_MATCH");
+            for (String each : listJobId(containerComposer)) {
+                commitMigrationByJobId(containerComposer, each);
+            }
+            List<String> lastJobIds = listJobId(containerComposer);
+            assertTrue(lastJobIds.isEmpty());
+            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
+            
containerComposer.assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT,
 "");
+        }
     }
     
-    @Parameters(name = "{0}")
-    public static Collection<PipelineTestParameter> getTestParameters() {
-        Collection<PipelineTestParameter> result = new LinkedList<>();
-        if (PipelineE2EEnvironment.getInstance().getItEnvType() == 
PipelineEnvTypeEnum.NONE) {
-            return result;
-        }
-        MySQLDatabaseType databaseType = new MySQLDatabaseType();
-        for (String each : 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(databaseType)) {
-            result.add(new PipelineTestParameter(databaseType, each, 
"env/scenario/general/mysql.xml"));
+    private void assertMigrationSuccessById(final PipelineContainerComposer 
containerComposer, final String jobId, final String algorithmType) throws 
SQLException, InterruptedException {
+        List<Map<String, Object>> jobStatus = 
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
+        for (Map<String, Object> each : jobStatus) {
+            
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 
0);
+            
assertThat(Integer.parseInt(each.get("inventory_finished_percentage").toString()),
 is(100));
         }
-        return result;
+        assertCheckMigrationSuccess(containerComposer, jobId, algorithmType);
     }
     
-    @Test
-    public void assertMigrationSuccess() throws SQLException, 
InterruptedException {
-        addMigrationProcessConfig();
-        getContainerComposer().createSourceOrderTable(SOURCE_TABLE_NAME);
-        getContainerComposer().createSourceOrderItemTable();
-        addMigrationSourceResource();
-        addMigrationTargetResource();
-        createTargetOrderTableRule();
-        createTargetOrderTableEncryptRule();
-        createTargetOrderItemTableRule();
-        Pair<List<Object[]>, List<Object[]>> dataPair = 
PipelineCaseHelper.generateFullInsertData(getContainerComposer().getDatabaseType(),
 PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
-        log.info("init data begin: {}", LocalDateTime.now());
-        
DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(), 
getContainerComposer().getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME),
 dataPair.getLeft());
-        
DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(), 
getContainerComposer().getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
-        log.info("init data end: {}", LocalDateTime.now());
-        startMigration(SOURCE_TABLE_NAME, TARGET_TABLE_NAME);
-        startMigration("t_order_item", "t_order_item");
-        String orderJobId = getJobIdByTableName("ds_0." + SOURCE_TABLE_NAME);
-        getContainerComposer().waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", orderJobId));
-        getContainerComposer().startIncrementTask(
-                new 
E2EIncrementalTask(getContainerComposer().getSourceDataSource(), 
SOURCE_TABLE_NAME, new SnowflakeKeyGenerateAlgorithm(), 
getContainerComposer().getDatabaseType(), 30));
-        assertMigrationSuccessById(orderJobId, "DATA_MATCH");
-        String orderItemJobId = getJobIdByTableName("ds_0.t_order_item");
-        assertMigrationSuccessById(orderItemJobId, "DATA_MATCH");
-        ThreadUtil.sleep(2, TimeUnit.SECONDS);
-        assertMigrationSuccessById(orderItemJobId, "CRC32_MATCH");
-        for (String each : listJobId()) {
-            commitMigrationByJobId(each);
-        }
-        List<String> lastJobIds = listJobId();
-        assertTrue(lastJobIds.isEmpty());
-        getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 
2);
-        
getContainerComposer().assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT,
 "");
+    private static boolean isEnabled() {
+        return PipelineEnvTypeEnum.NONE != 
PipelineE2EEnvironment.getInstance().getItEnvType() && 
!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
MySQLDatabaseType()).isEmpty();
     }
     
-    private void assertMigrationSuccessById(final String jobId, final String 
algorithmType) throws SQLException, InterruptedException {
-        List<Map<String, Object>> jobStatus = 
getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
-        for (Map<String, Object> each : jobStatus) {
-            
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 
0);
-            
assertThat(Integer.parseInt(each.get("inventory_finished_percentage").toString()),
 is(100));
+    private static class TestCaseArgumentsProvider implements 
ArgumentsProvider {
+        
+        @Override
+        public Stream<? extends Arguments> provideArguments(final 
ExtensionContext extensionContext) {
+            Collection<Arguments> result = new LinkedList<>();
+            MySQLDatabaseType databaseType = new MySQLDatabaseType();
+            for (String each : 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(databaseType)) {
+                result.add(Arguments.of(new 
PipelineTestParameter(databaseType, each, "env/scenario/general/mysql.xml")));
+            }
+            return result.stream();
         }
-        assertCheckMigrationSuccess(jobId, algorithmType);
     }
 }
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index d966f9c2eaf..e9f4b5a60f8 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -31,10 +31,12 @@ import 
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTyp
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 import java.sql.SQLException;
@@ -43,88 +45,94 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@RunWith(Parameterized.class)
 @Slf4j
 public final class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
     
     private static final String SOURCE_TABLE_NAME = "t_order_copy";
     
     private static final String TARGET_TABLE_NAME = "t_order";
-    
-    public PostgreSQLMigrationGeneralE2EIT(final PipelineTestParameter 
testParam) {
-        super(testParam, new MigrationJobType());
-    }
-    
-    @Parameters(name = "{0}")
-    public static Collection<PipelineTestParameter> getTestParameters() {
-        Collection<PipelineTestParameter> result = new LinkedList<>();
-        if (PipelineE2EEnvironment.getInstance().getItEnvType() == 
PipelineEnvTypeEnum.NONE) {
-            return result;
-        }
-        for (String each : 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
PostgreSQLDatabaseType())) {
-            result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(), 
each, "env/scenario/general/postgresql.xml"));
+        
+    @ParameterizedTest(name = "{0}")
+    @EnabledIf("isEnabled")
+    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    public void assertMigrationSuccess(final PipelineTestParameter testParam) 
throws SQLException, InterruptedException {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+            addMigrationProcessConfig(containerComposer);
+            createSourceSchema(containerComposer, 
PipelineContainerComposer.SCHEMA_NAME);
+            containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
+            containerComposer.createSourceOrderItemTable();
+            
containerComposer.createSourceTableIndexList(PipelineContainerComposer.SCHEMA_NAME,
 SOURCE_TABLE_NAME);
+            
containerComposer.createSourceCommentOnList(PipelineContainerComposer.SCHEMA_NAME,
 SOURCE_TABLE_NAME);
+            addMigrationSourceResource(containerComposer);
+            addMigrationTargetResource(containerComposer);
+            createTargetOrderTableRule(containerComposer);
+            createTargetOrderItemTableRule(containerComposer);
+            Pair<List<Object[]>, List<Object[]>> dataPair = 
PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+            log.info("init data begin: {}", LocalDateTime.now());
+            
DataSourceExecuteUtil.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
dataPair.getLeft());
+            
DataSourceExecuteUtil.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
+            log.info("init data end: {}", LocalDateTime.now());
+            startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME, 
"t_order");
+            Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS).until(() -> listJobId(containerComposer).size() > 0);
+            String jobId = getJobIdByTableName(containerComposer, "ds_0.test." 
+ SOURCE_TABLE_NAME);
+            containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+            containerComposer.startIncrementTask(new E2EIncrementalTask(
+                    containerComposer.getSourceDataSource(), String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME),
+                    new SnowflakeKeyGenerateAlgorithm(), 
containerComposer.getDatabaseType(), 20));
+            checkOrderMigration(containerComposer, jobId);
+            checkOrderItemMigration(containerComposer);
+            for (String each : listJobId(containerComposer)) {
+                commitMigrationByJobId(containerComposer, each);
+            }
+            List<String> lastJobIds = listJobId(containerComposer);
+            assertTrue(lastJobIds.isEmpty());
+            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
+            
containerComposer.assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT
 + 1, PipelineContainerComposer.SCHEMA_NAME);
         }
-        for (String each : 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
OpenGaussDatabaseType())) {
-            result.add(new PipelineTestParameter(new OpenGaussDatabaseType(), 
each, "env/scenario/general/postgresql.xml"));
-        }
-        return result;
     }
     
-    @Test
-    public void assertMigrationSuccess() throws SQLException, 
InterruptedException {
-        addMigrationProcessConfig();
-        createSourceSchema(PipelineContainerComposer.SCHEMA_NAME);
-        getContainerComposer().createSourceOrderTable(SOURCE_TABLE_NAME);
-        getContainerComposer().createSourceOrderItemTable();
-        
getContainerComposer().createSourceTableIndexList(PipelineContainerComposer.SCHEMA_NAME,
 SOURCE_TABLE_NAME);
-        
getContainerComposer().createSourceCommentOnList(PipelineContainerComposer.SCHEMA_NAME,
 SOURCE_TABLE_NAME);
-        addMigrationSourceResource();
-        addMigrationTargetResource();
-        createTargetOrderTableRule();
-        createTargetOrderItemTableRule();
-        Pair<List<Object[]>, List<Object[]>> dataPair = 
PipelineCaseHelper.generateFullInsertData(getContainerComposer().getDatabaseType(),
 PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
-        log.info("init data begin: {}", LocalDateTime.now());
-        
DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(), 
getContainerComposer().getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME),
 dataPair.getLeft());
-        
DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(), 
getContainerComposer().getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
-        log.info("init data end: {}", LocalDateTime.now());
-        startMigrationWithSchema(SOURCE_TABLE_NAME, "t_order");
-        Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS).until(() -> listJobId().size() > 0);
-        String jobId = getJobIdByTableName("ds_0.test." + SOURCE_TABLE_NAME);
-        getContainerComposer().waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
-        getContainerComposer().startIncrementTask(new E2EIncrementalTask(
-                getContainerComposer().getSourceDataSource(), String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME),
-                new SnowflakeKeyGenerateAlgorithm(), 
getContainerComposer().getDatabaseType(), 20));
-        checkOrderMigration(jobId);
-        checkOrderItemMigration();
-        for (String each : listJobId()) {
-            commitMigrationByJobId(each);
-        }
-        List<String> lastJobIds = listJobId();
-        assertTrue(lastJobIds.isEmpty());
-        getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 
2);
-        
getContainerComposer().assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT
 + 1, PipelineContainerComposer.SCHEMA_NAME);
-    }
-    
-    private void checkOrderMigration(final String jobId) throws SQLException, 
InterruptedException {
-        getContainerComposer().waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
-        stopMigrationByJobId(jobId);
+    private void checkOrderMigration(final PipelineContainerComposer 
containerComposer, final String jobId) throws SQLException, 
InterruptedException {
+        containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+        stopMigrationByJobId(containerComposer, jobId);
         long recordId = new SnowflakeKeyGenerateAlgorithm().generateKey();
-        getContainerComposer().sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id,user_id,status) VALUES (%s, %s, '%s')",
+        containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s 
(order_id,user_id,status) VALUES (%s, %s, '%s')",
                 String.join(".", PipelineContainerComposer.SCHEMA_NAME, 
SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
-        startMigrationByJobId(jobId);
+        startMigrationByJobId(containerComposer, jobId);
         // must refresh firstly, otherwise proxy can't get schema and table 
info
-        getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA;", 
2);
-        getContainerComposer().assertProxyOrderRecordExist(String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
-        assertCheckMigrationSuccess(jobId, "DATA_MATCH");
+        containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
+        containerComposer.assertProxyOrderRecordExist(String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
+        assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
+    }
+    
+    private void checkOrderItemMigration(final PipelineContainerComposer 
containerComposer) throws SQLException, InterruptedException {
+        startMigrationWithSchema(containerComposer, "t_order_item", 
"t_order_item");
+        String jobId = getJobIdByTableName(containerComposer, 
"ds_0.test.t_order_item");
+        containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+        assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
     }
     
-    private void checkOrderItemMigration() throws SQLException, 
InterruptedException {
-        startMigrationWithSchema("t_order_item", "t_order_item");
-        String jobId = getJobIdByTableName("ds_0.test.t_order_item");
-        getContainerComposer().waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
-        assertCheckMigrationSuccess(jobId, "DATA_MATCH");
+    private static boolean isEnabled() {
+        return PipelineEnvTypeEnum.NONE != 
PipelineE2EEnvironment.getInstance().getItEnvType()
+                && 
(!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
PostgreSQLDatabaseType()).isEmpty()
+                || 
!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
OpenGaussDatabaseType()).isEmpty());
+    }
+    
+    private static class TestCaseArgumentsProvider implements 
ArgumentsProvider {
+        
+        @Override
+        public Stream<? extends Arguments> provideArguments(final 
ExtensionContext extensionContext) {
+            Collection<Arguments> result = new LinkedList<>();
+            for (String each : 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
PostgreSQLDatabaseType())) {
+                result.add(Arguments.of(new PipelineTestParameter(new 
PostgreSQLDatabaseType(), each, "env/scenario/general/postgresql.xml")));
+            }
+            for (String each : 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
OpenGaussDatabaseType())) {
+                result.add(Arguments.of(new PipelineTestParameter(new 
OpenGaussDatabaseType(), each, "env/scenario/general/postgresql.xml")));
+            }
+            return result.stream();
+        }
     }
 }
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index afb97fe244e..02939ab4967 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -17,7 +17,6 @@
 
 package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.general;
 
-import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
@@ -27,16 +26,17 @@ import 
org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironme
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
 
 import java.sql.Connection;
-import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.stream.Stream;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -46,62 +46,63 @@ import static org.hamcrest.Matchers.is;
  * 1) no any rule.
  * 2) only encrypt rule.
  */
-@RunWith(Parameterized.class)
-@Slf4j
 public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
     
     private static final String SOURCE_TABLE_NAME = "t_order";
     
     private static final String TARGET_TABLE_NAME = "t_order";
     
-    public RulesMigrationE2EIT(final PipelineTestParameter testParam) {
-        super(testParam, new MigrationJobType());
-    }
-    
-    @Parameters(name = "{0}")
-    public static Collection<PipelineTestParameter> getTestParameters() {
-        Collection<PipelineTestParameter> result = new LinkedList<>();
-        if (PipelineE2EEnvironment.getInstance().getItEnvType() == 
PipelineEnvTypeEnum.NONE) {
-            return result;
-        }
-        List<String> versions = 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
MySQLDatabaseType());
-        if (versions.isEmpty()) {
-            return result;
+    @ParameterizedTest(name = "{0}")
+    @EnabledIf("isEnabled")
+    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    public void assertNoRuleMigrationSuccess(final PipelineTestParameter 
testParam) throws Exception {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+            assertMigrationSuccess(containerComposer, null);
         }
-        result.add(new PipelineTestParameter(new MySQLDatabaseType(), 
versions.get(0), "env/scenario/primary_key/text_primary_key/mysql.xml"));
-        return result;
     }
     
-    @Test
-    public void assertNoRuleMigrationSuccess() throws Exception {
-        assertMigrationSuccess(null);
-    }
-    
-    @Test
-    public void assertOnlyEncryptRuleMigrationSuccess() throws Exception {
-        assertMigrationSuccess(() -> {
-            createTargetOrderTableEncryptRule();
-            return null;
-        });
+    @ParameterizedTest(name = "{0}")
+    @EnabledIf("isEnabled")
+    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    public void assertOnlyEncryptRuleMigrationSuccess(final 
PipelineTestParameter testParam) throws Exception {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+            assertMigrationSuccess(containerComposer, () -> {
+                createTargetOrderTableEncryptRule(containerComposer);
+                return null;
+            });
+        }
     }
     
-    private void assertMigrationSuccess(final Callable<Void> addRuleFn) throws 
Exception {
-        getContainerComposer().createSourceOrderTable(SOURCE_TABLE_NAME);
-        try (Connection connection = 
getContainerComposer().getSourceDataSource().getConnection()) {
+    private void assertMigrationSuccess(final PipelineContainerComposer 
containerComposer, final Callable<Void> addRuleFn) throws Exception {
+        containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
+        try (Connection connection = 
containerComposer.getSourceDataSource().getConnection()) {
             
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, new 
UUIDKeyGenerateAlgorithm(), SOURCE_TABLE_NAME, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
         }
-        addMigrationSourceResource();
-        addMigrationTargetResource();
+        addMigrationSourceResource(containerComposer);
+        addMigrationTargetResource(containerComposer);
         if (null != addRuleFn) {
             addRuleFn.call();
         }
-        startMigration(SOURCE_TABLE_NAME, TARGET_TABLE_NAME);
-        String jobId = listJobId().get(0);
-        getContainerComposer().waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
-        getContainerComposer().waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
-        assertCheckMigrationSuccess(jobId, "DATA_MATCH");
-        commitMigrationByJobId(jobId);
-        getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 
1);
-        
assertThat(getContainerComposer().getTargetTableRecordsCount(SOURCE_TABLE_NAME),
 is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
+        startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
+        String jobId = listJobId(containerComposer).get(0);
+        containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
+        containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+        assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
+        commitMigrationByJobId(containerComposer, jobId);
+        containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+        
assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), 
is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
+    }
+    
+    private static boolean isEnabled() {
+        return PipelineEnvTypeEnum.NONE != 
PipelineE2EEnvironment.getInstance().getItEnvType() && 
!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
MySQLDatabaseType()).isEmpty();
+    }
+    
+    private static class TestCaseArgumentsProvider implements 
ArgumentsProvider {
+        
+        @Override
+        public Stream<? extends Arguments> provideArguments(final 
ExtensionContext extensionContext) {
+            List<String> versions = 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
MySQLDatabaseType());
+            return Stream.of(Arguments.of(new PipelineTestParameter(new 
MySQLDatabaseType(), versions.get(0), 
"env/scenario/primary_key/text_primary_key/mysql.xml")));
+        }
     }
 }
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 8accc74d08f..7076afdc448 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -17,7 +17,6 @@
 
 package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.binary.Hex;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
@@ -31,10 +30,12 @@ import 
org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironme
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -43,6 +44,7 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.stream.Stream;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -55,8 +57,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  * 3) multiple columns primary key, first column type is VARCHAR.
  * 4) multiple columns unique key, first column type is BIGINT.
  */
-@RunWith(Parameterized.class)
-@Slf4j
 public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     
     private static final String ORDER_TABLE_SHARDING_RULE_FORMAT = "CREATE 
SHARDING TABLE RULE t_order(\n"
@@ -69,52 +69,35 @@ public final class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
     
     private static final String TARGET_TABLE_NAME = "t_order";
     
-    public IndexesMigrationE2EIT(final PipelineTestParameter testParam) {
-        super(testParam, new MigrationJobType());
-    }
-    
-    @Parameters(name = "{0}")
-    public static Collection<PipelineTestParameter> getTestParameters() {
-        Collection<PipelineTestParameter> result = new LinkedList<>();
-        if (PipelineE2EEnvironment.getInstance().getItEnvType() == 
PipelineEnvTypeEnum.NONE) {
-            return result;
-        }
-        List<String> mysqlVersion = 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
MySQLDatabaseType());
-        if (!mysqlVersion.isEmpty()) {
-            result.add(new PipelineTestParameter(new MySQLDatabaseType(), 
mysqlVersion.get(0), "env/common/none.xml"));
-        }
-        List<String> postgresqlVersion = 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
PostgreSQLDatabaseType());
-        if (!postgresqlVersion.isEmpty()) {
-            result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(), 
postgresqlVersion.get(0), "env/common/none.xml"));
-        }
-        return result;
-    }
-    
-    @Test
-    public void assertNoUniqueKeyMigrationSuccess() throws Exception {
-        String sql;
-        String consistencyCheckAlgorithmType;
-        if (getContainerComposer().getDatabaseType() instanceof 
MySQLDatabaseType) {
-            sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, 
`user_id` INT NOT NULL, `status` varchar(255)) ENGINE=InnoDB DEFAULT 
CHARSET=utf8mb4";
-            // DATA_MATCH doesn't supported, could not order by records
-            consistencyCheckAlgorithmType = "CRC32_MATCH";
-        } else if (getContainerComposer().getDatabaseType() instanceof 
PostgreSQLDatabaseType) {
-            sql = "CREATE TABLE %s (order_id varchar(255) NOT NULL,user_id int 
NOT NULL,status varchar(255) NULL)";
-            consistencyCheckAlgorithmType = null;
-        } else {
-            return;
+    @ParameterizedTest(name = "{0}")
+    @EnabledIf("isEnabled")
+    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    public void assertNoUniqueKeyMigrationSuccess(final PipelineTestParameter 
testParam) throws Exception {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+            String sql;
+            String consistencyCheckAlgorithmType;
+            if (containerComposer.getDatabaseType() instanceof 
MySQLDatabaseType) {
+                sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, 
`user_id` INT NOT NULL, `status` varchar(255)) ENGINE=InnoDB DEFAULT 
CHARSET=utf8mb4";
+                // DATA_MATCH doesn't supported, could not order by records
+                consistencyCheckAlgorithmType = "CRC32_MATCH";
+            } else if (containerComposer.getDatabaseType() instanceof 
PostgreSQLDatabaseType) {
+                sql = "CREATE TABLE %s (order_id varchar(255) NOT NULL,user_id 
int NOT NULL,status varchar(255) NULL)";
+                consistencyCheckAlgorithmType = null;
+            } else {
+                return;
+            }
+            KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
+            Object uniqueKey = keyGenerateAlgorithm.generateKey();
+            assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
+                insertOneOrder(containerComposer, uniqueKey);
+                containerComposer.assertProxyOrderRecordExist("t_order", 
uniqueKey);
+                return null;
+            });
         }
-        KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
-        Object uniqueKey = keyGenerateAlgorithm.generateKey();
-        assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm, 
consistencyCheckAlgorithmType, () -> {
-            insertOneOrder(uniqueKey);
-            getContainerComposer().assertProxyOrderRecordExist("t_order", 
uniqueKey);
-            return null;
-        });
     }
     
-    private void insertOneOrder(final Object uniqueKey) throws SQLException {
-        try (PreparedStatement preparedStatement = 
getContainerComposer().getSourceDataSource().getConnection().prepareStatement("INSERT
 INTO t_order (order_id,user_id,status) VALUES (?,?,?)")) {
+    private void insertOneOrder(final PipelineContainerComposer 
containerComposer, final Object uniqueKey) throws SQLException {
+        try (PreparedStatement preparedStatement = 
containerComposer.getSourceDataSource().getConnection().prepareStatement("INSERT
 INTO t_order (order_id,user_id,status) VALUES (?,?,?)")) {
             preparedStatement.setObject(1, uniqueKey);
             preparedStatement.setObject(2, 1);
             preparedStatement.setObject(3, "OK");
@@ -123,88 +106,123 @@ public final class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         }
     }
     
-    @Test
-    public void assertMultiPrimaryKeyMigrationSuccess() throws Exception {
-        String sql;
-        String consistencyCheckAlgorithmType;
-        if (getContainerComposer().getDatabaseType() instanceof 
MySQLDatabaseType) {
-            sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, 
`user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY 
(`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
-            consistencyCheckAlgorithmType = "CRC32_MATCH";
-        } else {
-            return;
+    @ParameterizedTest(name = "{0}")
+    @EnabledIf("isEnabled")
+    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    public void assertMultiPrimaryKeyMigrationSuccess(final 
PipelineTestParameter testParam) throws Exception {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+            String sql;
+            String consistencyCheckAlgorithmType;
+            if (containerComposer.getDatabaseType() instanceof 
MySQLDatabaseType) {
+                sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, 
`user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY 
(`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+                consistencyCheckAlgorithmType = "CRC32_MATCH";
+            } else {
+                return;
+            }
+            KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
+            Object uniqueKey = keyGenerateAlgorithm.generateKey();
+            assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
+                insertOneOrder(containerComposer, uniqueKey);
+                containerComposer.assertProxyOrderRecordExist("t_order", 
uniqueKey);
+                return null;
+            });
         }
-        KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
-        Object uniqueKey = keyGenerateAlgorithm.generateKey();
-        assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm, 
consistencyCheckAlgorithmType, () -> {
-            insertOneOrder(uniqueKey);
-            getContainerComposer().assertProxyOrderRecordExist("t_order", 
uniqueKey);
-            return null;
-        });
     }
     
-    @Test
-    public void assertMultiUniqueKeyMigrationSuccess() throws Exception {
-        String sql;
-        String consistencyCheckAlgorithmType;
-        if (getContainerComposer().getDatabaseType() instanceof 
MySQLDatabaseType) {
-            sql = "CREATE TABLE `%s` (`order_id` BIGINT NOT NULL, `user_id` 
INT NOT NULL, `status` varchar(255), UNIQUE KEY (`order_id`,`user_id`)) 
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
-            consistencyCheckAlgorithmType = "DATA_MATCH";
-        } else {
-            return;
+    @ParameterizedTest(name = "{0}")
+    @EnabledIf("isEnabled")
+    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    public void assertMultiUniqueKeyMigrationSuccess(final 
PipelineTestParameter testParam) throws Exception {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+            String sql;
+            String consistencyCheckAlgorithmType;
+            if (containerComposer.getDatabaseType() instanceof 
MySQLDatabaseType) {
+                sql = "CREATE TABLE `%s` (`order_id` BIGINT NOT NULL, 
`user_id` INT NOT NULL, `status` varchar(255), UNIQUE KEY 
(`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+                consistencyCheckAlgorithmType = "DATA_MATCH";
+            } else {
+                return;
+            }
+            KeyGenerateAlgorithm keyGenerateAlgorithm = new 
SnowflakeKeyGenerateAlgorithm();
+            Object uniqueKey = keyGenerateAlgorithm.generateKey();
+            assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
+                insertOneOrder(containerComposer, uniqueKey);
+                containerComposer.assertProxyOrderRecordExist("t_order", 
uniqueKey);
+                return null;
+            });
         }
-        KeyGenerateAlgorithm keyGenerateAlgorithm = new 
SnowflakeKeyGenerateAlgorithm();
-        Object uniqueKey = keyGenerateAlgorithm.generateKey();
-        assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm, 
consistencyCheckAlgorithmType, () -> {
-            insertOneOrder(uniqueKey);
-            getContainerComposer().assertProxyOrderRecordExist("t_order", 
uniqueKey);
-            return null;
-        });
     }
     
-    @Test
-    public void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess() 
throws Exception {
-        String sql;
-        String consistencyCheckAlgorithmType;
-        if (getContainerComposer().getDatabaseType() instanceof 
MySQLDatabaseType) {
-            sql = "CREATE TABLE `%s` (`order_id` VARBINARY(64) NOT NULL, 
`user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) 
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
-            // DATA_MATCH doesn't supported: Order by value must implements 
Comparable
-            consistencyCheckAlgorithmType = "CRC32_MATCH";
-        } else {
-            return;
+    @ParameterizedTest(name = "{0}")
+    @EnabledIf("isEnabled")
+    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    public void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess(final 
PipelineTestParameter testParam) throws Exception {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+            String sql;
+            String consistencyCheckAlgorithmType;
+            if (containerComposer.getDatabaseType() instanceof 
MySQLDatabaseType) {
+                sql = "CREATE TABLE `%s` (`order_id` VARBINARY(64) NOT NULL, 
`user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) 
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+                // DATA_MATCH doesn't supported: Order by value must 
implements Comparable
+                consistencyCheckAlgorithmType = "CRC32_MATCH";
+            } else {
+                return;
+            }
+            KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
+            // TODO Insert binary string in VARBINARY column. But 
KeyGenerateAlgorithm.generateKey() require returning Comparable, and byte[] is 
not Comparable
+            byte[] uniqueKey = new byte[]{-1, 0, 1};
+            assertMigrationSuccess(containerComposer, sql, "order_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
+                insertOneOrder(containerComposer, uniqueKey);
+                // TODO Select by byte[] from proxy doesn't work, so unhex 
function is used for now
+                
containerComposer.assertProxyOrderRecordExist(String.format("SELECT 1 FROM 
t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
+                return null;
+            });
         }
-        KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
-        // TODO Insert binary string in VARBINARY column. But 
KeyGenerateAlgorithm.generateKey() require returning Comparable, and byte[] is 
not Comparable
-        byte[] uniqueKey = new byte[]{-1, 0, 1};
-        assertMigrationSuccess(sql, "order_id", keyGenerateAlgorithm, 
consistencyCheckAlgorithmType, () -> {
-            insertOneOrder(uniqueKey);
-            // TODO Select by byte[] from proxy doesn't work, so unhex 
function is used for now
-            
getContainerComposer().assertProxyOrderRecordExist(String.format("SELECT 1 FROM 
t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
-            return null;
-        });
     }
     
-    private void assertMigrationSuccess(final String sqlPattern, final String 
shardingColumn, final KeyGenerateAlgorithm keyGenerateAlgorithm,
+    private void assertMigrationSuccess(final PipelineContainerComposer 
containerComposer, final String sqlPattern, final String shardingColumn, final 
KeyGenerateAlgorithm keyGenerateAlgorithm,
                                         final String 
consistencyCheckAlgorithmType, final Callable<Void> incrementalTaskFn) throws 
Exception {
-        getContainerComposer().sourceExecuteWithLog(String.format(sqlPattern, 
SOURCE_TABLE_NAME));
-        try (Connection connection = 
getContainerComposer().getSourceDataSource().getConnection()) {
+        containerComposer.sourceExecuteWithLog(String.format(sqlPattern, 
SOURCE_TABLE_NAME));
+        try (Connection connection = 
containerComposer.getSourceDataSource().getConnection()) {
             
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
keyGenerateAlgorithm, SOURCE_TABLE_NAME, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
         }
-        addMigrationProcessConfig();
-        addMigrationSourceResource();
-        addMigrationTargetResource();
-        
getContainerComposer().proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT,
 shardingColumn), 2);
-        startMigration(SOURCE_TABLE_NAME, TARGET_TABLE_NAME);
-        String jobId = listJobId().get(0);
-        getContainerComposer().waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+        addMigrationProcessConfig(containerComposer);
+        addMigrationSourceResource(containerComposer);
+        addMigrationTargetResource(containerComposer);
+        
containerComposer.proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT,
 shardingColumn), 2);
+        startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
+        String jobId = listJobId(containerComposer).get(0);
+        containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
         incrementalTaskFn.call();
-        getContainerComposer().waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+        containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
         if (null != consistencyCheckAlgorithmType) {
-            assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
+            assertCheckMigrationSuccess(containerComposer, jobId, 
consistencyCheckAlgorithmType);
         }
-        commitMigrationByJobId(jobId);
-        getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 
1);
-        
assertThat(getContainerComposer().getTargetTableRecordsCount(SOURCE_TABLE_NAME),
 is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
-        List<String> lastJobIds = listJobId();
+        commitMigrationByJobId(containerComposer, jobId);
+        containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+        
assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), 
is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
+        List<String> lastJobIds = listJobId(containerComposer);
         assertTrue(lastJobIds.isEmpty());
     }
+    
+    private static boolean isEnabled() {
+        return PipelineEnvTypeEnum.NONE != 
PipelineE2EEnvironment.getInstance().getItEnvType()
+                && 
(!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
MySQLDatabaseType()).isEmpty()
+                || 
!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
PostgreSQLDatabaseType()).isEmpty());
+    }
+    
+    private static class TestCaseArgumentsProvider implements 
ArgumentsProvider {
+        
+        @Override
+        public Stream<? extends Arguments> provideArguments(final 
ExtensionContext extensionContext) {
+            Collection<Arguments> result = new LinkedList<>();
+            List<String> mysqlVersion = 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
MySQLDatabaseType());
+            if (!mysqlVersion.isEmpty()) {
+                result.add(Arguments.of(new PipelineTestParameter(new 
MySQLDatabaseType(), mysqlVersion.get(0), "env/common/none.xml")));
+            }
+            List<String> postgresqlVersion = 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
PostgreSQLDatabaseType());
+            if (!postgresqlVersion.isEmpty()) {
+                result.add(Arguments.of(new PipelineTestParameter(new 
PostgreSQLDatabaseType(), postgresqlVersion.get(0), "env/common/none.xml")));
+            }
+            return result.stream();
+        }
+    }
 }
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index 2ed54d39654..35bf66cd1c6 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -28,22 +28,22 @@ import 
org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironme
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.stream.Stream;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@RunWith(Parameterized.class)
 @Slf4j
 public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
     
@@ -51,48 +51,47 @@ public final class MariaDBMigrationE2EIT extends 
AbstractMigrationE2EIT {
     
     private static final String TARGET_TABLE_NAME = "t_order";
     
-    public MariaDBMigrationE2EIT(final PipelineTestParameter testParam) {
-        super(testParam, new MigrationJobType());
+    @ParameterizedTest(name = "{0}")
+    @EnabledIf("isEnabled")
+    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    public void assertMigrationSuccess(final PipelineTestParameter testParam) 
throws SQLException, InterruptedException {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+            String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT 
NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) 
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+            containerComposer.sourceExecuteWithLog(String.format(sqlPattern, 
SOURCE_TABLE_NAME));
+            try (Connection connection = 
containerComposer.getSourceDataSource().getConnection()) {
+                KeyGenerateAlgorithm generateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
+                
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
generateAlgorithm, SOURCE_TABLE_NAME, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+            }
+            addMigrationProcessConfig(containerComposer);
+            addMigrationSourceResource(containerComposer);
+            addMigrationTargetResource(containerComposer);
+            createTargetOrderTableRule(containerComposer);
+            startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
+            String jobId = listJobId(containerComposer).get(0);
+            containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+            containerComposer.sourceExecuteWithLog("INSERT INTO t_order 
(order_id, user_id, status) VALUES ('a1', 1, 'OK')");
+            containerComposer.assertProxyOrderRecordExist("t_order", "a1");
+            containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+            assertCheckMigrationSuccess(containerComposer, jobId, 
"CRC32_MATCH");
+            commitMigrationByJobId(containerComposer, jobId);
+            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+            
assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), 
is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
+            List<String> lastJobIds = listJobId(containerComposer);
+            assertTrue(lastJobIds.isEmpty());
+        }
     }
     
-    @Parameters(name = "{0}")
-    public static Collection<PipelineTestParameter> getTestParameters() {
-        Collection<PipelineTestParameter> result = new LinkedList<>();
-        if (PipelineE2EEnvironment.getInstance().getItEnvType() == 
PipelineEnvTypeEnum.NONE) {
-            return result;
-        }
-        List<String> versions = 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
MySQLDatabaseType());
-        if (versions.isEmpty()) {
-            return result;
-        }
-        // TODO use MariaDBDatabaseType
-        result.add(new PipelineTestParameter(new MySQLDatabaseType(), 
versions.get(0), "env/common/none.xml"));
-        return result;
+    private static boolean isEnabled() {
+        return PipelineEnvTypeEnum.NONE != 
PipelineE2EEnvironment.getInstance().getItEnvType() && 
!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
MySQLDatabaseType()).isEmpty();
     }
     
-    @Test
-    public void assertMigrationSuccess() throws SQLException, 
InterruptedException {
-        String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT 
NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) 
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
-        getContainerComposer().sourceExecuteWithLog(String.format(sqlPattern, 
SOURCE_TABLE_NAME));
-        try (Connection connection = 
getContainerComposer().getSourceDataSource().getConnection()) {
-            KeyGenerateAlgorithm generateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
-            
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
generateAlgorithm, SOURCE_TABLE_NAME, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+    private static class TestCaseArgumentsProvider implements 
ArgumentsProvider {
+        
+        @Override
+        public Stream<? extends Arguments> provideArguments(final 
ExtensionContext extensionContext) {
+            List<String> versions = 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
MySQLDatabaseType());
+            // TODO use MariaDBDatabaseType
+            return Stream.of(Arguments.of(new PipelineTestParameter(new 
MySQLDatabaseType(), versions.get(0), "env/common/none.xml")));
         }
-        addMigrationProcessConfig();
-        addMigrationSourceResource();
-        addMigrationTargetResource();
-        createTargetOrderTableRule();
-        startMigration(SOURCE_TABLE_NAME, TARGET_TABLE_NAME);
-        String jobId = listJobId().get(0);
-        getContainerComposer().waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
-        getContainerComposer().sourceExecuteWithLog("INSERT INTO t_order 
(order_id, user_id, status) VALUES ('a1', 1, 'OK')");
-        getContainerComposer().assertProxyOrderRecordExist("t_order", "a1");
-        getContainerComposer().waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
-        assertCheckMigrationSuccess(jobId, "CRC32_MATCH");
-        commitMigrationByJobId(jobId);
-        getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 
1);
-        
assertThat(getContainerComposer().getTargetTableRecordsCount(SOURCE_TABLE_NAME),
 is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
-        List<String> lastJobIds = listJobId();
-        assertTrue(lastJobIds.isEmpty());
     }
 }
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index 16561dfdf53..6030c172822 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -17,7 +17,6 @@
 
 package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
 
-import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
@@ -30,69 +29,78 @@ import 
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTyp
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
 import 
org.apache.shardingsphere.test.e2e.env.container.atomic.util.DatabaseTypeUtil;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
 
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@RunWith(Parameterized.class)
-@Slf4j
 public class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
     
     private static final String TARGET_TABLE_NAME = "t_order";
     
-    public TextPrimaryKeyMigrationE2EIT(final PipelineTestParameter testParam) 
{
-        super(testParam, new MigrationJobType());
+    @ParameterizedTest(name = "{0}")
+    @EnabledIf("isEnabled")
+    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    public void assertTextPrimaryMigrationSuccess(final PipelineTestParameter 
testParam) throws SQLException, InterruptedException {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+            
containerComposer.createSourceOrderTable(getSourceTableName(containerComposer));
+            try (Connection connection = 
containerComposer.getSourceDataSource().getConnection()) {
+                UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
+                
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
keyGenerateAlgorithm, getSourceTableName(containerComposer), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+            }
+            addMigrationProcessConfig(containerComposer);
+            addMigrationSourceResource(containerComposer);
+            addMigrationTargetResource(containerComposer);
+            createTargetOrderTableRule(containerComposer);
+            startMigration(containerComposer, 
getSourceTableName(containerComposer), TARGET_TABLE_NAME);
+            String jobId = listJobId(containerComposer).get(0);
+            containerComposer.sourceExecuteWithLog(
+                    String.format("INSERT INTO %s (order_id,user_id,status) 
VALUES (%s, %s, '%s')", getSourceTableName(containerComposer), "1000000000", 1, 
"afterStop"));
+            containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+            assertCheckMigrationSuccess(containerComposer, jobId, 
"DATA_MATCH");
+            commitMigrationByJobId(containerComposer, jobId);
+            List<String> lastJobIds = listJobId(containerComposer);
+            assertTrue(lastJobIds.isEmpty());
+        }
     }
     
-    @Parameters(name = "{0}")
-    public static Collection<PipelineTestParameter> getTestParameters() {
-        Collection<PipelineTestParameter> result = new LinkedList<>();
-        if (PipelineE2EEnvironment.getInstance().getItEnvType() == 
PipelineEnvTypeEnum.NONE) {
-            return result;
-        }
-        for (String version : 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
MySQLDatabaseType())) {
-            result.add(new PipelineTestParameter(new MySQLDatabaseType(), 
version, "env/scenario/primary_key/text_primary_key/mysql.xml"));
-        }
-        for (String version : 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
PostgreSQLDatabaseType())) {
-            result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(), 
version, "env/scenario/primary_key/text_primary_key/postgresql.xml"));
-        }
-        for (String version : 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
OpenGaussDatabaseType())) {
-            result.add(new PipelineTestParameter(new OpenGaussDatabaseType(), 
version, "env/scenario/primary_key/text_primary_key/postgresql.xml"));
-        }
-        return result;
+    private String getSourceTableName(final PipelineContainerComposer 
containerComposer) {
+        return DatabaseTypeUtil.isMySQL(containerComposer.getDatabaseType()) ? 
"T_ORDER" : "t_order";
     }
     
-    @Test
-    public void assertTextPrimaryMigrationSuccess() throws SQLException, 
InterruptedException {
-        getContainerComposer().createSourceOrderTable(getSourceTableName());
-        try (Connection connection = 
getContainerComposer().getSourceDataSource().getConnection()) {
-            UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
-            
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
keyGenerateAlgorithm, getSourceTableName(), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
-        }
-        addMigrationProcessConfig();
-        addMigrationSourceResource();
-        addMigrationTargetResource();
-        createTargetOrderTableRule();
-        startMigration(getSourceTableName(), TARGET_TABLE_NAME);
-        String jobId = listJobId().get(0);
-        getContainerComposer().sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id,user_id,status) VALUES (%s, %s, '%s')", getSourceTableName(), 
"1000000000", 1, "afterStop"));
-        getContainerComposer().waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
-        assertCheckMigrationSuccess(jobId, "DATA_MATCH");
-        commitMigrationByJobId(jobId);
-        List<String> lastJobIds = listJobId();
-        assertTrue(lastJobIds.isEmpty());
+    private static boolean isEnabled() {
+        return PipelineEnvTypeEnum.NONE != 
PipelineE2EEnvironment.getInstance().getItEnvType()
+                && 
(!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
MySQLDatabaseType()).isEmpty()
+                || 
!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
PostgreSQLDatabaseType()).isEmpty()
+                || 
!PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
OpenGaussDatabaseType()).isEmpty());
     }
     
-    private String getSourceTableName() {
-        return 
DatabaseTypeUtil.isMySQL(getContainerComposer().getDatabaseType()) ? "T_ORDER" 
: "t_order";
+    private static class TestCaseArgumentsProvider implements 
ArgumentsProvider {
+        
+        @Override
+        public Stream<? extends Arguments> provideArguments(final 
ExtensionContext extensionContext) {
+            Collection<Arguments> result = new LinkedList<>();
+            for (String version : 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
MySQLDatabaseType())) {
+                result.add(Arguments.of(new PipelineTestParameter(new 
MySQLDatabaseType(), version, 
"env/scenario/primary_key/text_primary_key/mysql.xml")));
+            }
+            for (String version : 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
PostgreSQLDatabaseType())) {
+                result.add(Arguments.of(new PipelineTestParameter(new 
PostgreSQLDatabaseType(), version, 
"env/scenario/primary_key/text_primary_key/postgresql.xml")));
+            }
+            for (String version : 
PipelineE2EEnvironment.getInstance().listStorageContainerImages(new 
OpenGaussDatabaseType())) {
+                result.add(Arguments.of(new PipelineTestParameter(new 
OpenGaussDatabaseType(), version, 
"env/scenario/primary_key/text_primary_key/postgresql.xml")));
+            }
+            return result.stream();
+        }
     }
 }


Reply via email to