This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 51718e1072d Review and improve pipeline code (#28372)
51718e1072d is described below

commit 51718e1072db7442e33851283cc1ec9af73009f4
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Sep 6 16:56:06 2023 +0800

    Review and improve pipeline code (#28372)
---
 .../data/pipeline/api/metadata/SchemaTableName.java            | 10 ++++++++++
 .../check/consistency/MigrationDataConsistencyChecker.java     |  8 ++++----
 .../e2e/data/pipeline/cases/PipelineContainerComposer.java     |  2 +-
 .../test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java  |  6 ++++++
 .../scenario/migration/api/impl/MigrationJobAPITest.java       |  2 +-
 .../check/consistency/MigrationDataConsistencyCheckerTest.java |  2 +-
 6 files changed, 23 insertions(+), 7 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
index ab4496aa6c5..985ab7607a2 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
@@ -42,4 +42,14 @@ public class SchemaTableName {
         this.schemaName = new SchemaName(schemaName);
         this.tableName = new TableName(tableName);
     }
+    
+    /**
+     * Marshal to text.
+     *
+     * @return text
+     */
+    public String marshal() {
+        String schemaName = this.schemaName.getOriginal();
+        return null == schemaName ? tableName.getOriginal() : schemaName + "." 
+ tableName.getOriginal();
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index f4fef5957a4..13dde2fb842 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -86,14 +86,14 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
         progressContext.setRecordsCount(getRecordsCount());
         progressContext.getTableNames().addAll(sourceTableNames);
         progressContext.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
-        Map<DataNode, TableDataConsistencyCheckResult> result = new 
LinkedHashMap<>();
+        Map<SchemaTableName, TableDataConsistencyCheckResult> result = new 
LinkedHashMap<>();
         TableDataConsistencyChecker tableChecker = 
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps);
         try (PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager()) {
             for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
                 checkTableInventoryData(each, tableChecker, result, 
dataSourceManager);
             }
         }
-        return result.entrySet().stream().collect(Collectors.toMap(entry -> 
DataNodeUtils.formatWithSchema(entry.getKey()), Entry::getValue));
+        return result.entrySet().stream().collect(Collectors.toMap(entry -> 
entry.getKey().marshal(), Entry::getValue));
     }
     
     private long getRecordsCount() {
@@ -102,11 +102,11 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
     }
     
     private void checkTableInventoryData(final JobDataNodeLine 
jobDataNodeLine, final TableDataConsistencyChecker tableChecker,
-                                         final Map<DataNode, 
TableDataConsistencyCheckResult> checkResultMap, final 
PipelineDataSourceManager dataSourceManager) {
+                                         final Map<SchemaTableName, 
TableDataConsistencyCheckResult> checkResultMap, final 
PipelineDataSourceManager dataSourceManager) {
         for (JobDataNodeEntry entry : jobDataNodeLine.getEntries()) {
             for (DataNode each : entry.getDataNodes()) {
                 TableDataConsistencyCheckResult checkResult = 
checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker, 
dataSourceManager);
-                checkResultMap.put(each, checkResult);
+                checkResultMap.put(new SchemaTableName(each.getSchemaName(), 
each.getTableName()), checkResult);
                 if (!checkResult.isMatched() && 
tableChecker.isBreakOnInventoryCheckNotMatched()) {
                     log.info("Unmatched on table '{}', ignore left tables", 
DataNodeUtils.formatWithSchema(each));
                     return;
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index ac21dc787b2..420b42823e2 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -372,7 +372,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
      * @throws SQLException SQL exception
      */
     public void proxyExecuteWithLog(final String sql, final int sleepSeconds) 
throws SQLException {
-        log.info("proxy execute :{}", sql);
+        log.info("proxy execute: {}", sql);
         List<String> sqlList = 
Splitter.on(";").trimResults().omitEmptyStrings().splitToList(sql);
         try (Connection connection = proxyDataSource.getConnection()) {
             for (String each : sqlList) {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index d03bb3b7a86..a79edb85aef 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.test.e2e.data.pipeline.cases.task;
 
 import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -44,6 +45,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
 @RequiredArgsConstructor
 @Slf4j
@@ -70,6 +72,7 @@ public final class E2EIncrementalTask extends 
BaseIncrementTask {
     
     private final int loopCount;
     
+    @SneakyThrows(InterruptedException.class)
     @Override
     public void run() {
         List<Object[]> orderInsertData = 
PipelineCaseHelper.generateOrderInsertData(databaseType, 
primaryKeyGenerateAlgorithm, loopCount);
@@ -77,16 +80,19 @@ public final class E2EIncrementalTask extends 
BaseIncrementTask {
         for (Object[] each : orderInsertData) {
             primaryKeys.add(each[0]);
             insertOrder(each);
+            TimeUnit.MILLISECONDS.sleep(100L);
         }
         ThreadLocalRandom random = ThreadLocalRandom.current();
         for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
             // TODO 0000-00-00 00:00:00 now will cause consistency check 
failed of MySQL.
             // DataSourceUtil.execute(dataSource, String.format("UPDATE %s SET 
t_datetime='0000-00-00 00:00:00' WHERE order_id = ?", orderTableName)
             updateOrderById(primaryKeys.get(random.nextInt(0, 
primaryKeys.size())));
+            TimeUnit.MILLISECONDS.sleep(500L);
         }
         for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
             setNullToAllFields(primaryKeys.get(random.nextInt(0, 
primaryKeys.size())));
             deleteOrderById(primaryKeys.remove(random.nextInt(0, 
primaryKeys.size())));
+            TimeUnit.MILLISECONDS.sleep(500L);
         }
         log.info("increment task runnable execute successfully.");
     }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 239ff019d3e..d052d9609ae 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -179,7 +179,7 @@ class MigrationJobAPITest {
         Map<String, TableDataConsistencyCheckResult> checkResultMap = 
jobAPI.buildPipelineDataConsistencyChecker(
                 jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new 
ConsistencyCheckJobItemProgressContext(jobId.get(), 0)).check("FIXTURE", null);
         assertThat(checkResultMap.size(), is(1));
-        String checkKey = "ds_0.t_order";
+        String checkKey = "t_order";
         
assertTrue(checkResultMap.get(checkKey).getCountCheckResult().isMatched());
         
assertThat(checkResultMap.get(checkKey).getCountCheckResult().getTargetRecordsCount(),
 is(2L));
         
assertTrue(checkResultMap.get(checkKey).getContentCheckResult().isMatched());
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 5821727cf9c..f4e849174e1 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -65,7 +65,7 @@ class MigrationDataConsistencyCheckerTest {
         governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(), 
0, "");
         Map<String, TableDataConsistencyCheckResult> actual = new 
MigrationDataConsistencyChecker(jobConfig, new 
MigrationProcessContext(jobConfig.getJobId(), null),
                 
createConsistencyCheckJobItemProgressContext()).check("FIXTURE", null);
-        String checkKey = "ds_0.t_order";
+        String checkKey = "t_order";
         assertTrue(actual.get(checkKey).getCountCheckResult().isMatched());
         
assertThat(actual.get(checkKey).getCountCheckResult().getSourceRecordsCount(), 
is(actual.get(checkKey).getCountCheckResult().getTargetRecordsCount()));
         assertTrue(actual.get(checkKey).getContentCheckResult().isMatched());

Reply via email to