This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bf25028149e Review and improve pipeline code (#30341)
bf25028149e is described below

commit bf25028149e666dff1dbbb9007fe8ff4362ca325
Author: Hongsheng Zhong <zhonghongsh...@apache.org>
AuthorDate: Thu Feb 29 09:45:59 2024 +0800

    Review and improve pipeline code (#30341)
    
    * Merge DataRecord base fields in DataRecordGroupEngine
    
    * Persist job item progress after it completed
    
    * Improve inventory check break for migration
---
 .../ingest/record/group/DataRecordGroupEngine.java   |  8 ++++++++
 .../core/job/AbstractSeparablePipelineJob.java       |  2 ++
 .../record/group/DataRecordGroupEngineTest.java      | 20 ++++++++++++++++++++
 .../consistency/MigrationDataConsistencyChecker.java | 12 ++++++++----
 4 files changed, 38 insertions(+), 4 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index 82624900af8..4e2e5b578de 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -120,6 +120,7 @@ public final class DataRecordGroupEngine {
                 () -> new 
PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
         if (null != beforeDataRecord && PipelineSQLOperationType.UPDATE == 
beforeDataRecord.getType() && isUniqueKeyUpdated(beforeDataRecord)) {
             DataRecord mergedDataRecord = new 
DataRecord(PipelineSQLOperationType.DELETE, dataRecord.getTableName(), 
dataRecord.getPosition(), dataRecord.getColumnCount());
+            mergeBaseFields(dataRecord, mergedDataRecord);
             for (int i = 0; i < dataRecord.getColumnCount(); i++) {
                 mergedDataRecord.addColumn(new 
Column(dataRecord.getColumn(i).getName(),
                         dataRecord.getColumn(i).isUniqueKey() ? 
beforeDataRecord.getColumn(i).getOldValue() : 
beforeDataRecord.getColumn(i).getValue(),
@@ -132,6 +133,12 @@ public final class DataRecordGroupEngine {
         }
     }
     
+    private void mergeBaseFields(final DataRecord sourceRecord, final 
DataRecord targetRecord) {
+        targetRecord.setActualTableName(sourceRecord.getActualTableName());
+        targetRecord.setCsn(sourceRecord.getCsn());
+        targetRecord.setCommitTime(sourceRecord.getCommitTime());
+    }
+    
     private boolean isUniqueKeyUpdated(final DataRecord dataRecord) {
         // TODO Compatible with multiple unique indexes
         for (Column each : dataRecord.getColumns()) {
@@ -144,6 +151,7 @@ public final class DataRecordGroupEngine {
     
     private DataRecord mergeUpdateColumn(final PipelineSQLOperationType type, 
final String tableName, final DataRecord preDataRecord, final DataRecord 
curDataRecord) {
         DataRecord result = new DataRecord(type, tableName, 
curDataRecord.getPosition(), curDataRecord.getColumnCount());
+        mergeBaseFields(curDataRecord, result);
         for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
             result.addColumn(new Column(
                     curDataRecord.getColumn(i).getName(),
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 855bd8ed717..b8ae593d3e4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -29,6 +29,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
@@ -97,6 +98,7 @@ public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfigur
             }
         } finally {
             if (started) {
+                PipelineJobProgressPersistService.persistNow(jobId, 
shardingItem);
                 
jobRunnerManager.getTasksRunner(shardingItem).ifPresent(PipelineTasksRunner::stop);
             }
         }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
index 504143998c0..9bc7940b582 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
@@ -71,6 +71,8 @@ class DataRecordGroupEngineTest {
         DataRecord dataRecord = actual.iterator().next();
         assertThat(dataRecord.getType(), is(PipelineSQLOperationType.INSERT));
         assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getActualTableName(), is("order_0"));
+        assertThat(dataRecord.getCommitTime(), is(456L));
         assertColumnsMatched(dataRecord.getColumn(0), new Column("id", null, 
1, true, true));
         assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
null, 10, true, false));
         assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", null, 200, true, false));
@@ -93,6 +95,8 @@ class DataRecordGroupEngineTest {
         DataRecord dataRecord = actual.iterator().next();
         assertThat(dataRecord.getType(), is(PipelineSQLOperationType.INSERT));
         assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getActualTableName(), is("order_0"));
+        assertThat(dataRecord.getCommitTime(), is(456L));
         assertColumnsMatched(dataRecord.getColumn(0), new Column("id", null, 
2, true, true));
         assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
null, 10, true, false));
         assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", null, 50, true, false));
@@ -107,6 +111,8 @@ class DataRecordGroupEngineTest {
         DataRecord dataRecord = actual.iterator().next();
         assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
         assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getActualTableName(), is("order_0"));
+        assertThat(dataRecord.getCommitTime(), is(456L));
         assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 1, 
false, true));
         assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, 10, false, false));
         assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, 200, true, false));
@@ -121,6 +127,8 @@ class DataRecordGroupEngineTest {
         DataRecord dataRecord = actual.iterator().next();
         assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
         assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getActualTableName(), is("order_0"));
+        assertThat(dataRecord.getCommitTime(), is(456L));
         assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 2, 
true, true));
         assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, 10, false, false));
         assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, 200, true, false));
@@ -135,6 +143,8 @@ class DataRecordGroupEngineTest {
         DataRecord dataRecord = actual.iterator().next();
         assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
         assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getActualTableName(), is("order_0"));
+        assertThat(dataRecord.getCommitTime(), is(456L));
         assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 2, 
true, true));
         assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, 10, false, false));
         assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, 200, true, false));
@@ -149,6 +159,8 @@ class DataRecordGroupEngineTest {
         DataRecord dataRecord = actual.iterator().next();
         assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
         assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getActualTableName(), is("order_0"));
+        assertThat(dataRecord.getCommitTime(), is(456L));
         assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 3, 
true, true));
         assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, 10, false, false));
         assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, 50, false, false));
@@ -188,6 +200,8 @@ class DataRecordGroupEngineTest {
         DataRecord dataRecord = actual.iterator().next();
         assertThat(dataRecord.getType(), is(PipelineSQLOperationType.DELETE));
         assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getActualTableName(), is("order_0"));
+        assertThat(dataRecord.getCommitTime(), is(789L));
         assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 
null, true, true));
         assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, null, true, false));
         assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, null, true, false));
@@ -230,6 +244,8 @@ class DataRecordGroupEngineTest {
     
     private DataRecord mockInsertDataRecord(final String tableName, final int 
id, final int userId, final int totalPrice) {
         DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, 
tableName, new IngestPlaceholderPosition(), 3);
+        result.setActualTableName("order_0");
+        result.setCommitTime(123L);
         result.addColumn(new Column("id", id, true, true));
         result.addColumn(new Column("user_id", userId, true, false));
         result.addColumn(new Column("total_price", totalPrice, true, false));
@@ -250,6 +266,8 @@ class DataRecordGroupEngineTest {
     
     private DataRecord mockUpdateDataRecord(final String tableName, final 
Integer oldId, final int id, final int userId, final int totalPrice) {
         DataRecord result = new DataRecord(PipelineSQLOperationType.UPDATE, 
tableName, new IngestPlaceholderPosition(), 3);
+        result.setActualTableName("order_0");
+        result.setCommitTime(456L);
         result.addColumn(new Column("id", oldId, id, 
!Objects.deepEquals(oldId, id), true));
         result.addColumn(new Column("user_id", userId, userId, false, false));
         result.addColumn(new Column("total_price", 50, totalPrice, 50 != 
totalPrice, false));
@@ -262,6 +280,8 @@ class DataRecordGroupEngineTest {
     
     private DataRecord mockDeleteDataRecord(final String tableName, final int 
id, final int userId, final int totalPrice) {
         DataRecord result = new DataRecord(PipelineSQLOperationType.DELETE, 
tableName, new IngestPlaceholderPosition(), 3);
+        result.setActualTableName("order_0");
+        result.setCommitTime(789L);
         result.addColumn(new Column("id", id, null, true, true));
         result.addColumn(new Column("user_id", userId, null, true, false));
         result.addColumn(new Column("total_price", totalPrice, null, true, 
false));
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 2d9a57595fd..767d856c728 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -95,7 +95,9 @@ public final class MigrationDataConsistencyChecker implements 
PipelineDataConsis
                 PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
                 TableDataConsistencyChecker tableChecker = 
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps)) {
             for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
-                checkTableInventoryData(each, tableChecker, result, 
dataSourceManager);
+                if (checkTableInventoryDataUnmatchedAndBreak(each, 
tableChecker, result, dataSourceManager)) {
+                    return 
result.entrySet().stream().collect(Collectors.toMap(entry -> 
entry.getKey().toString(), Entry::getValue));
+                }
             }
         }
         return result.entrySet().stream().collect(Collectors.toMap(entry -> 
entry.getKey().toString(), Entry::getValue));
@@ -106,18 +108,20 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
         return 
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getProcessedRecordsCount).sum();
     }
     
-    private void checkTableInventoryData(final JobDataNodeLine 
jobDataNodeLine, final TableDataConsistencyChecker tableChecker,
-                                         final 
Map<CaseInsensitiveQualifiedTable, TableDataConsistencyCheckResult> 
checkResultMap, final PipelineDataSourceManager dataSourceManager) {
+    private boolean checkTableInventoryDataUnmatchedAndBreak(final 
JobDataNodeLine jobDataNodeLine, final TableDataConsistencyChecker tableChecker,
+                                                             final 
Map<CaseInsensitiveQualifiedTable, TableDataConsistencyCheckResult> 
checkResultMap,
+                                                             final 
PipelineDataSourceManager dataSourceManager) {
         for (JobDataNodeEntry entry : jobDataNodeLine.getEntries()) {
             for (DataNode each : entry.getDataNodes()) {
                 TableDataConsistencyCheckResult checkResult = 
checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker, 
dataSourceManager);
                 checkResultMap.put(new 
CaseInsensitiveQualifiedTable(each.getSchemaName(), each.getTableName()), 
checkResult);
                 if (!checkResult.isMatched() && 
tableChecker.isBreakOnInventoryCheckNotMatched()) {
                     log.info("Unmatched on table '{}', ignore left tables", 
DataNodeUtils.formatWithSchema(each));
-                    return;
+                    return true;
                 }
             }
         }
+        return false;
     }
     
     private TableDataConsistencyCheckResult 
checkSingleTableInventoryData(final String targetTableName, final DataNode 
dataNode,

Reply via email to