This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f7731b2b7cc Replace proxy data source to ShardingSphereDataSource to 
query, avoid "REFRESH TABLE METADATA" in process (#26340)
f7731b2b7cc is described below

commit f7731b2b7ccbf7bb59f78b9174eddf5e3c563671
Author: Hongsheng Zhong <zhonghongsh...@apache.org>
AuthorDate: Thu Jun 15 09:50:27 2023 +0800

    Replace proxy data source to ShardingSphereDataSource to query, avoid 
"REFRESH TABLE METADATA" in process (#26340)
    
    * Replace proxy data source to ShardingSphereDataSource to query, avoid 
"REFRESH TABLE METADATA" in process
    
    * Wait new record with jdbc data source in PostgreSQLMigrationGeneralE2EIT
---
 .../pipeline/cases/PipelineContainerComposer.java  | 43 +++++++++++++++------
 .../general/MySQLMigrationGeneralE2EIT.java        |  7 ++--
 .../general/PostgreSQLMigrationGeneralE2EIT.java   | 12 +++---
 .../migration/general/RulesMigrationE2EIT.java     |  3 +-
 .../primarykey/IndexesMigrationE2EIT.java          | 45 +++++++++++-----------
 .../primarykey/MariaDBMigrationE2EIT.java          | 12 ++++--
 6 files changed, 73 insertions(+), 49 deletions(-)

diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 92f80e1bacf..15151dfbb98 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import 
org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
 import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
@@ -378,10 +379,22 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
      * @throws RuntimeException runtime exception
      */
     public List<Map<String, Object>> queryForListWithLog(final String sql) {
+        return queryForListWithLog(proxyDataSource, sql);
+    }
+    
+    /**
+     * Query for list with log.
+     *
+     * @param dataSource data source
+     * @param sql SQL
+     * @return query result
+     * @throws RuntimeException runtime exception
+     */
+    public List<Map<String, Object>> queryForListWithLog(final DataSource 
dataSource, final String sql) {
         log.info("Query SQL: {}", sql);
         int retryNumber = 0;
         while (retryNumber <= 3) {
-            try (Connection connection = proxyDataSource.getConnection()) {
+            try (Connection connection = dataSource.getConnection()) {
                 ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
                 return transformResultSetToList(resultSet);
                 // CHECKSTYLE:OFF
@@ -457,47 +470,50 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     }
     
     /**
-     * Assert proxy order record exist.
+     * Assert order record exists in proxy.
      *
+     * @param dataSource data source
      * @param tableName table name
      * @param orderId order id
      */
-    public void assertProxyOrderRecordExist(final String tableName, final 
Object orderId) {
+    public void assertOrderRecordExist(final DataSource dataSource, final 
String tableName, final Object orderId) {
         String sql;
         if (orderId instanceof String) {
             sql = String.format("SELECT 1 FROM %s WHERE order_id = '%s'", 
tableName, orderId);
         } else {
             sql = String.format("SELECT 1 FROM %s WHERE order_id = %s", 
tableName, orderId);
         }
-        assertProxyOrderRecordExist(sql);
+        assertOrderRecordExist(dataSource, sql);
     }
     
     /**
      * Assert proxy order record exist.
      *
+     * @param dataSource data source
      * @param sql SQL
      */
-    public void assertProxyOrderRecordExist(final String sql) {
+    public void assertOrderRecordExist(final DataSource dataSource, final 
String sql) {
         boolean recordExist = false;
         for (int i = 0; i < 5; i++) {
-            List<Map<String, Object>> result = queryForListWithLog(sql);
+            List<Map<String, Object>> result = queryForListWithLog(dataSource, 
sql);
             recordExist = !result.isEmpty();
             if (recordExist) {
                 break;
             }
             Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> 
true);
         }
-        assertTrue(recordExist, "The insert record must exist after the stop");
+        assertTrue(recordExist, "Order record does not exist");
     }
     
     /**
      * Get target table records count.
      *
+     * @param dataSource data source
      * @param tableName table name
      * @return target table records count
      */
-    public int getTargetTableRecordsCount(final String tableName) {
-        List<Map<String, Object>> targetList = queryForListWithLog("SELECT 
COUNT(1) AS count FROM " + tableName);
+    public int getTargetTableRecordsCount(final DataSource dataSource, final 
String tableName) {
+        List<Map<String, Object>> targetList = queryForListWithLog(dataSource, 
"SELECT COUNT(1) AS count FROM " + tableName);
         assertFalse(targetList.isEmpty());
         return ((Number) targetList.get(0).get("count")).intValue();
     }
@@ -505,12 +521,13 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     /**
      * Assert greater than order table init rows.
      *
+     * @param dataSource data source
      * @param tableInitRows table init rows
      * @param schema schema
      */
-    public void assertGreaterThanOrderTableInitRows(final int tableInitRows, 
final String schema) {
+    public void assertGreaterThanOrderTableInitRows(final DataSource 
dataSource, final int tableInitRows, final String schema) {
         String tableName = Strings.isNullOrEmpty(schema) ? "t_order" : 
String.format("%s.t_order", schema);
-        int recordsCount = getTargetTableRecordsCount(tableName);
+        int recordsCount = getTargetTableRecordsCount(dataSource, tableName);
         assertTrue(recordsCount > tableInitRows, "actual count " + 
recordsCount);
     }
     
@@ -522,8 +539,10 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
      */
     // TODO proxy support for some fields still needs to be optimized, such as 
binary of MySQL, after these problems are optimized, Proxy dataSource can be 
used.
     public DataSource generateShardingSphereDataSourceFromProxy() throws 
SQLException {
-        Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !getYamlRootConfig().getRules().isEmpty());
+        Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> null != getYamlRootConfig().getRules());
         YamlRootConfiguration rootConfig = getYamlRootConfig();
+        ShardingSpherePreconditions.checkNotNull(rootConfig.getDataSources(), 
() -> new IllegalStateException("dataSources is null"));
+        ShardingSpherePreconditions.checkNotNull(rootConfig.getRules(), () -> 
new IllegalStateException("rules is null"));
         if (PipelineEnvTypeEnum.DOCKER == 
PipelineE2EEnvironment.getInstance().getItEnvType()) {
             DockerStorageContainer storageContainer = 
((DockerContainerComposer) containerComposer).getStorageContainers().get(0);
             String sourceUrl = String.join(":", 
storageContainer.getNetworkAliases().get(0), 
Integer.toString(storageContainer.getExposedPort()));
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index b6e0807d441..680a1c16193 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -37,6 +37,7 @@ import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
+import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.util.List;
@@ -82,8 +83,8 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
                     new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, 
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
             
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
-            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-            containerComposer.assertProxyOrderRecordExist("t_order", 10000);
+            DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
+            containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order", 10000);
             assertMigrationSuccessById(containerComposer, orderJobId, 
"DATA_MATCH");
             String orderItemJobId = getJobIdByTableName(containerComposer, 
"ds_0.t_order_item");
             assertMigrationSuccessById(containerComposer, orderItemJobId, 
"DATA_MATCH");
@@ -94,7 +95,7 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             }
             List<String> lastJobIds = listJobId(containerComposer);
             assertTrue(lastJobIds.isEmpty());
-            
containerComposer.assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT,
 "");
+            
containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT, "");
         }
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index b80f7d33350..a8a2ea75ab2 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
+import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.util.List;
@@ -85,8 +86,8 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
                     containerComposer.getDatabaseType(), 20));
             
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
-            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-            containerComposer.assertProxyOrderRecordExist(schemaTableName, 
10000);
+            DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
+            containerComposer.assertOrderRecordExist(jdbcDataSource, 
schemaTableName, 10000);
             checkOrderMigration(containerComposer, jobId);
             checkOrderItemMigration(containerComposer);
             for (String each : listJobId(containerComposer)) {
@@ -94,7 +95,7 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             }
             List<String> lastJobIds = listJobId(containerComposer);
             assertTrue(lastJobIds.isEmpty());
-            
containerComposer.assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT
 + 1, PipelineContainerComposer.SCHEMA_NAME);
+            
containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1, 
PipelineContainerComposer.SCHEMA_NAME);
         }
     }
     
@@ -105,9 +106,8 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
         containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s 
(order_id,user_id,status) VALUES (%s, %s, '%s')",
                 String.join(".", PipelineContainerComposer.SCHEMA_NAME, 
SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
         startMigrationByJobId(containerComposer, jobId);
-        Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog(
-                String.format("SELECT * FROM %s WHERE order_id = %s", 
String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), 
recordId)).isEmpty());
-        containerComposer.assertProxyOrderRecordExist(String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
+        DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
+        containerComposer.assertOrderRecordExist(jdbcDataSource, 
String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), 
recordId);
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 00048c42375..7ded08bd0e6 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -89,8 +89,7 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
         containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
         commitMigrationByJobId(containerComposer, jobId);
-        containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-        
assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), 
is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
+        
assertThat(containerComposer.getTargetTableRecordsCount(containerComposer.getProxyDataSource(),
 SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
     }
     
     private static boolean isEnabled() {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 56ab40e2583..ae62f923a2a 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -22,6 +22,7 @@ import org.apache.commons.codec.binary.Hex;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
 import 
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
@@ -37,11 +38,12 @@ import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.function.Consumer;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -88,7 +90,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             }
             KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
             // TODO PostgreSQL update delete events not support if table 
without unique keys at increment task.
-            final Callable<Void> incrementalTaskFn = () -> {
+            final Consumer<DataSource> incrementalTaskFn = dataSource -> {
                 Object orderId = keyGenerateAlgorithm.generateKey();
                 insertOneOrder(containerComposer, orderId);
                 if (containerComposer.getDatabaseType() instanceof 
MySQLDatabaseType) {
@@ -96,7 +98,6 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
                     deleteOneOrder(containerComposer, orderId, "updated");
                     insertOneOrder(containerComposer, 
keyGenerateAlgorithm.generateKey());
                 }
-                return null;
             };
             assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, incrementalTaskFn);
         }
@@ -110,7 +111,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         deleteOneOrder(containerComposer, orderId, updatedStatus);
     }
     
-    private void insertOneOrder(final PipelineContainerComposer 
containerComposer, final Object uniqueKey) throws SQLException {
+    private void insertOneOrder(final PipelineContainerComposer 
containerComposer, final Object uniqueKey) {
         try (
                 Connection connection = 
containerComposer.getSourceDataSource().getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement("INSERT INTO t_order (order_id,user_id,status) 
VALUES (?,?,?)")) {
@@ -119,10 +120,12 @@ class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
             preparedStatement.setObject(3, "OK");
             int actualCount = preparedStatement.executeUpdate();
             assertThat(actualCount, is(1));
+        } catch (final SQLException ex) {
+            throw new SQLWrapperException(ex);
         }
     }
     
-    private void updateOneOrder(final PipelineContainerComposer 
containerComposer, final Object uniqueKey, final String updatedStatus) throws 
SQLException {
+    private void updateOneOrder(final PipelineContainerComposer 
containerComposer, final Object uniqueKey, final String updatedStatus) {
         try (
                 Connection connection = 
containerComposer.getSourceDataSource().getConnection();
                 PreparedStatement preparedStatement = connection
@@ -133,10 +136,12 @@ class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
             preparedStatement.setObject(4, "OK");
             int actualCount = preparedStatement.executeUpdate();
             assertThat(actualCount, is(1));
+        } catch (final SQLException ex) {
+            throw new SQLWrapperException(ex);
         }
     }
     
-    private void deleteOneOrder(final PipelineContainerComposer 
containerComposer, final Object uniqueKey, final String updatedStatus) throws 
SQLException {
+    private void deleteOneOrder(final PipelineContainerComposer 
containerComposer, final Object uniqueKey, final String updatedStatus) {
         try (
                 Connection connection = 
containerComposer.getSourceDataSource().getConnection();
                 PreparedStatement preparedStatement = connection
@@ -146,6 +151,8 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             preparedStatement.setObject(3, updatedStatus);
             int actualCount = preparedStatement.executeUpdate();
             assertThat(actualCount, is(1));
+        } catch (final SQLException ex) {
+            throw new SQLWrapperException(ex);
         }
     }
     
@@ -164,12 +171,10 @@ class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
             }
             KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
             Object uniqueKey = keyGenerateAlgorithm.generateKey();
-            assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
+            assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
                 insertOneOrder(containerComposer, uniqueKey);
                 doCreateUpdateDelete(containerComposer, 
keyGenerateAlgorithm.generateKey());
-                containerComposer.proxyExecuteWithLog("REFRESH TABLE 
METADATA", 1);
-                containerComposer.assertProxyOrderRecordExist("t_order", 
uniqueKey);
-                return null;
+                containerComposer.assertOrderRecordExist(dataSource, 
"t_order", uniqueKey);
             });
         }
     }
@@ -189,12 +194,10 @@ class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
             }
             KeyGenerateAlgorithm keyGenerateAlgorithm = new 
SnowflakeKeyGenerateAlgorithm();
             Object uniqueKey = keyGenerateAlgorithm.generateKey();
-            assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
+            assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
                 insertOneOrder(containerComposer, uniqueKey);
                 doCreateUpdateDelete(containerComposer, 
keyGenerateAlgorithm.generateKey());
-                containerComposer.proxyExecuteWithLog("REFRESH TABLE 
METADATA", 1);
-                containerComposer.assertProxyOrderRecordExist("t_order", 
uniqueKey);
-                return null;
+                containerComposer.assertOrderRecordExist(dataSource, 
"t_order", uniqueKey);
             });
         }
     }
@@ -216,18 +219,16 @@ class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
             KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
             // TODO Insert binary string in VARBINARY column. But 
KeyGenerateAlgorithm.generateKey() require returning Comparable, and byte[] is 
not Comparable
             byte[] uniqueKey = new byte[]{-1, 0, 1};
-            assertMigrationSuccess(containerComposer, sql, "order_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
+            assertMigrationSuccess(containerComposer, sql, "order_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
                 insertOneOrder(containerComposer, uniqueKey);
-                containerComposer.proxyExecuteWithLog("REFRESH TABLE 
METADATA", 1);
                 // TODO Select by byte[] from proxy doesn't work, so unhex 
function is used for now
-                
containerComposer.assertProxyOrderRecordExist(String.format("SELECT 1 FROM 
t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
-                return null;
+                containerComposer.assertOrderRecordExist(dataSource, 
String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", 
Hex.encodeHexString(uniqueKey)));
             });
         }
     }
     
     private void assertMigrationSuccess(final PipelineContainerComposer 
containerComposer, final String sqlPattern, final String shardingColumn, final 
KeyGenerateAlgorithm keyGenerateAlgorithm,
-                                        final String 
consistencyCheckAlgorithmType, final Callable<Void> incrementalTaskFn) throws 
Exception {
+                                        final String 
consistencyCheckAlgorithmType, final Consumer<DataSource> incrementalTaskFn) 
throws Exception {
         containerComposer.sourceExecuteWithLog(String.format(sqlPattern, 
SOURCE_TABLE_NAME));
         try (Connection connection = 
containerComposer.getSourceDataSource().getConnection()) {
             
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
keyGenerateAlgorithm, SOURCE_TABLE_NAME, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
@@ -240,14 +241,14 @@ class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
         String jobId = listJobId(containerComposer).get(0);
         containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
-        incrementalTaskFn.call();
+        DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
+        incrementalTaskFn.accept(jdbcDataSource);
         containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
         if (null != consistencyCheckAlgorithmType) {
             assertCheckMigrationSuccess(containerComposer, jobId, 
consistencyCheckAlgorithmType);
         }
         commitMigrationByJobId(containerComposer, jobId);
-        containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-        
assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), 
is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
+        
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource, 
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
         List<String> lastJobIds = listJobId(containerComposer);
         assertTrue(lastJobIds.isEmpty());
     }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index 972f60576b7..5936fa974f7 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -30,10 +30,12 @@ import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.Pipeline
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.List;
@@ -42,6 +44,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+// TODO Use MariaDB docker image
+@Disabled
 @PipelineE2ESettings(fetchSingle = true, database = 
@PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = 
"env/common/none.xml"))
 @Slf4j
 class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
@@ -53,7 +57,7 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
-    void assertMigrationSuccess(final PipelineTestParameter testParam) throws 
SQLException, InterruptedException {
+    void assertMigrationSuccess(final PipelineTestParameter testParam) throws 
SQLException {
         try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
             String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT 
NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) 
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
             containerComposer.sourceExecuteWithLog(String.format(sqlPattern, 
SOURCE_TABLE_NAME));
@@ -70,12 +74,12 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
             String jobId = listJobId(containerComposer).get(0);
             containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
             containerComposer.sourceExecuteWithLog("INSERT INTO t_order 
(order_id, user_id, status) VALUES ('a1', 1, 'OK')");
-            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-            containerComposer.assertProxyOrderRecordExist("t_order", "a1");
+            DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
+            containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order", "a1");
             containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
             assertCheckMigrationSuccess(containerComposer, jobId, 
"CRC32_MATCH");
             commitMigrationByJobId(containerComposer, jobId);
-            
assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), 
is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
+            
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource, 
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
             List<String> lastJobIds = listJobId(containerComposer);
             assertTrue(lastJobIds.isEmpty());
         }

Reply via email to