This is an automated email from the ASF dual-hosted git repository. zhonghongsheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 9788c1bc0f3 Improve migration IT, remove comment (#21712) 9788c1bc0f3 is described below commit 9788c1bc0f3386a98c964ddd0fba31844e020531 Author: Xinze Guo <101622833+aze...@users.noreply.github.com> AuthorDate: Mon Oct 24 12:56:28 2022 +0800 Improve migration IT, remove comment (#21712) * Improve check migration after stop * Fix codestyle --- .../integration/data/pipeline/cases/base/BaseITCase.java | 16 ++++++++++++++++ .../cases/migration/general/MySQLMigrationGeneralIT.java | 1 - .../migration/general/PostgreSQLMigrationGeneralIT.java | 16 +++++----------- .../migration/primarykey/TextPrimaryKeyMigrationIT.java | 1 - 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java index 777984e4c06..dd5c14db202 100644 --- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java +++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java @@ -300,6 +300,22 @@ public abstract class BaseITCase { return Collections.emptyList(); } + protected void assertProxyOrderRecordExist(final Object id) throws SQLException { + // must refresh firstly, otherwise proxy can't get schema and table info + boolean recordExist = false; + proxyExecuteWithLog("REFRESH TABLE METADATA;", 2); + for (int i = 0; i < 5; i++) { + String sql = String.format("select * from %s where order_id = %s", String.join(".", SCHEMA_NAME, getTargetTableOrderName()), id); + List<Map<String, Object>> result = queryForListWithLog(sql); + recordExist = !result.isEmpty(); + if (recordExist) { + break; + } + ThreadUtil.sleep(2, TimeUnit.SECONDS); + } + assertTrue("The insert record must exist after the stop", recordExist); + } + protected void assertGreaterThanOrderTableInitRows(final int tableInitRows, final String schema) throws SQLException { proxyExecuteWithLog("REFRESH TABLE METADATA", 2); String countSQL = Strings.isNullOrEmpty(schema) ? "SELECT COUNT(*) as count FROM t_order" : String.format("SELECT COUNT(*) as count FROM %s.t_order", schema); diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java index 2929e143128..6cb495c3062 100644 --- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java +++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java @@ -118,6 +118,5 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase { assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 0); } assertCheckMigrationSuccess(jobId, algorithmType); - stopMigrationByJobId(jobId); } } diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java index 1768a6f002f..ce9f4b10c52 100644 --- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java +++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java @@ -115,17 +115,12 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate, SCHEMA_NAME, getSourceTableOrderName(), 20)); String jobId = getJobIdByTableName(getSourceTableOrderName()); waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId)); - /* - * TODO Compatible with restart job, before stopping job, incremental_idle_seconds=16, before checking migration, incremental_idle_seconds=23, it just pass 7 seconds, and it's not enough for - * PostgreSQL incremental task to sync data - */ - // stopMigrationByJobId(jobId); - // sourceExecuteWithLog(String.format("INSERT INTO %s.%s (order_id,user_id,status) VALUES (%s, %s, '%s')", SCHEMA_NAME, getSourceTableOrderName(), KEY_GENERATE_ALGORITHM.generateKey(), - // 1, "afterStop")); - // startMigrationByJobId(jobId); - // waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId)); - assertCheckMigrationSuccess(jobId, "DATA_MATCH"); stopMigrationByJobId(jobId); + Comparable<?> recordId = KEY_GENERATE_ALGORITHM.generateKey(); + sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')", String.join(".", SCHEMA_NAME, getSourceTableOrderName()), recordId, 1, "afterStop")); + startMigrationByJobId(jobId); + assertProxyOrderRecordExist(recordId); + assertCheckMigrationSuccess(jobId, "DATA_MATCH"); } private void checkOrderItemMigration() throws SQLException, InterruptedException { @@ -133,6 +128,5 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase String jobId = getJobIdByTableName("t_order_item"); waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId)); assertCheckMigrationSuccess(jobId, "DATA_MATCH"); - stopMigrationByJobId(jobId); } } diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java index bd74d687c65..1d9751ab0b2 100644 --- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java +++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java @@ -97,7 +97,6 @@ public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase { } else { assertCheckMigrationSuccess(jobId, "DATA_MATCH"); } - stopMigrationByJobId(jobId); if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) { commitMigrationByJobId(jobId); List<String> lastJobIds = listJobId();