This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 79cabdc52f9 Refactor DataRecord.getKey() (#29515)
79cabdc52f9 is described below

commit 79cabdc52f9a565e01e3c491ee0354d5f319d55f
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 23 18:09:14 2023 +0800

    Refactor DataRecord.getKey() (#29515)
    
    * Refactor DataRecord.getKey()
    
    * Move DataRecordGroupEngine
    
    * Move DataRecordGroupEngine
---
 .../core/importer/sink/PipelineDataSourceSink.java |  4 ++--
 .../pipeline/core/ingest/record/DataRecord.java    | 19 +++++----------
 .../record/group}/DataRecordGroupEngine.java       | 11 +++------
 .../record/{ => group}/GroupedDataRecord.java      |  3 ++-
 .../core/ingest/record/DataRecordTest.java         | 28 ++++++++--------------
 .../record/group}/DataRecordGroupEngineTest.java   |  7 +++---
 6 files changed, 26 insertions(+), 46 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index c59c7068194..842a28a8dfd 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -23,12 +23,12 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.DataRecordGroupEngine;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.GroupedDataRecord;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.JobOperationType;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
index 5e02714a5c1..c347c8921e0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
@@ -22,9 +22,11 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -45,9 +47,9 @@ public final class DataRecord extends Record {
     
     private final List<Column> columns;
     
-    private final List<Object> uniqueKeyValue = new LinkedList<>();
+    private final Collection<Object> uniqueKeyValue = new LinkedList<>();
     
-    private final List<Object> oldUniqueKeyValues = new ArrayList<>();
+    private final Collection<Object> oldUniqueKeyValues = new LinkedList<>();
     
     private String actualTableName;
     
@@ -103,16 +105,7 @@ public final class DataRecord extends Record {
      * @return key
      */
     public Key getKey() {
-        return new Key(tableName, uniqueKeyValue);
-    }
-    
-    /**
-     * Get old key.
-     *
-     * @return key
-     */
-    public Key getOldKey() {
-        return new Key(tableName, oldUniqueKeyValues);
+        return IngestDataChangeType.DELETE.equals(type) ? new Key(tableName, 
oldUniqueKeyValues) : new Key(tableName, uniqueKeyValue);
     }
     
     @RequiredArgsConstructor
@@ -121,6 +114,6 @@ public final class DataRecord extends Record {
         
         private final String tableName;
         
-        private final List<Object> uniqueKeyValues;
+        private final Collection<Object> uniqueKeyValues;
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
similarity index 88%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngine.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index 0f084944d59..cfc2cefc70b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.importer;
+package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
 
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord.Key;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.GroupedDataRecord;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -50,7 +49,7 @@ public final class DataRecordGroupEngine {
         Map<String, Map<String, List<DataRecord>>> batchDataRecords = new 
LinkedHashMap<>();
         for (DataRecord each : records) {
             tableNames.add(each.getTableName());
-            if (duplicateKeys.getOrDefault(getKey(each), false)) {
+            if (duplicateKeys.getOrDefault(each.getKey(), false)) {
                 nonBatchRecords.computeIfAbsent(each.getTableName(), ignored 
-> new LinkedList<>()).add(each);
             } else {
                 batchDataRecords.computeIfAbsent(each.getTableName(), ignored 
-> new HashMap<>()).computeIfAbsent(each.getType(), ignored -> new 
LinkedList<>()).add(each);
@@ -63,16 +62,12 @@ public final class DataRecordGroupEngine {
     private Map<Key, Boolean> getDuplicateKeys(final Collection<DataRecord> 
records) {
         Map<Key, Boolean> result = new HashMap<>();
         for (DataRecord each : records) {
-            Key key = getKey(each);
+            Key key = each.getKey();
             result.put(key, result.containsKey(key));
         }
         return result;
     }
     
-    private Key getKey(final DataRecord record) {
-        return IngestDataChangeType.DELETE.equals(record.getType()) ? 
record.getOldKey() : record.getKey();
-    }
-    
     private GroupedDataRecord getGroupedDataRecord(final String tableName, 
final Map<String, List<DataRecord>> batchRecords, final List<DataRecord> 
nonBatchRecords) {
         return new GroupedDataRecord(tableName, 
batchRecords.getOrDefault(IngestDataChangeType.INSERT, Collections.emptyList()),
                 batchRecords.getOrDefault(IngestDataChangeType.UPDATE, 
Collections.emptyList()), 
batchRecords.getOrDefault(IngestDataChangeType.DELETE, 
Collections.emptyList()), nonBatchRecords);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/GroupedDataRecord.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
similarity index 93%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/GroupedDataRecord.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
index b3324db81ae..daa0e47c177 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/GroupedDataRecord.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.record;
+package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 
 import java.util.List;
 
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
index 003423f4430..cdea349b563 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
@@ -26,29 +26,21 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 class DataRecordTest {
     
-    private DataRecord beforeDataRecord;
-    
-    private DataRecord afterDataRecord;
-    
     @Test
-    void assertKeyEqual() {
-        beforeDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1", 
new IngestPlaceholderPosition(), 2);
+    void assertGetKeyWithUpdate() {
+        DataRecord beforeDataRecord = new 
DataRecord(IngestDataChangeType.UPDATE, "foo_tbl", new 
IngestPlaceholderPosition(), 1);
         beforeDataRecord.addColumn(new Column("id", 1, true, true));
-        beforeDataRecord.addColumn(new Column("name", "1", true, false));
-        afterDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1", 
new IngestPlaceholderPosition(), 2);
-        afterDataRecord.addColumn(new Column("id", 1, true, true));
-        afterDataRecord.addColumn(new Column("name", "2", true, false));
+        DataRecord afterDataRecord = new 
DataRecord(IngestDataChangeType.UPDATE, "foo_tbl", new 
IngestPlaceholderPosition(), 1);
+        afterDataRecord.addColumn(new Column("id", 2, 1, true, true));
         assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getKey()));
     }
     
     @Test
-    void assertOldKeyEqual() {
-        beforeDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1", 
new IngestPlaceholderPosition(), 2);
-        beforeDataRecord.addColumn(new Column("id", 1, true, true));
-        beforeDataRecord.addColumn(new Column("name", "1", true, false));
-        afterDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1", 
new IngestPlaceholderPosition(), 2);
-        afterDataRecord.addColumn(new Column("id", 1, 2, true, true));
-        afterDataRecord.addColumn(new Column("name", "2", true, false));
-        assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getOldKey()));
+    void assertGetKeyWithDelete() {
+        DataRecord beforeDataRecord = new 
DataRecord(IngestDataChangeType.DELETE, "foo_tbl", new 
IngestPlaceholderPosition(), 1);
+        beforeDataRecord.addColumn(new Column("id", 1, 2, true, true));
+        DataRecord afterDataRecord = new 
DataRecord(IngestDataChangeType.DELETE, "foo_tbl", new 
IngestPlaceholderPosition(), 1);
+        afterDataRecord.addColumn(new Column("id", 1, 3, true, true));
+        assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getKey()));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
similarity index 98%
rename from 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngineTest.java
rename to 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
index 0be6fff94e1..ea14af84134 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.importer;
+package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
 
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.GroupedDataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;

Reply via email to