This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 67c9eb16 [Improve] fix doris source duplicate splitid (#414)
67c9eb16 is described below

commit 67c9eb16b6c3adcf24e95c0ad9f0b229fb40cdc2
Author: wudi <676366...@qq.com>
AuthorDate: Wed Jul 3 14:21:52 2024 +0800

    [Improve] fix doris source duplicate splitid (#414)
    
    * fix doris source duplicate splitid
    
    * code style
---
 .../org/apache/doris/flink/source/DorisSource.java |  7 +++++--
 .../source/assigners/SimpleSplitAssigner.java      |  4 ++++
 .../source/enumerator/DorisSourceEnumerator.java   |  2 +-
 .../doris/flink/source/split/DorisSourceSplit.java | 10 +++++----
 .../source/split/DorisSourceSplitSerializer.java   | 24 ++++++++++++++++------
 .../PendingSplitsCheckpointSerializerTest.java     |  3 ++-
 .../flink/source/reader/DorisSourceReaderTest.java |  2 +-
 .../split/DorisSourceSplitSerializerTest.java      |  3 ++-
 .../flink/source/split/DorisSourceSplitTest.java   |  4 ++--
 9 files changed, 41 insertions(+), 18 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
index 55436b07..3c71c068 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
@@ -97,9 +97,12 @@ public class DorisSource<OUT>
         List<DorisSourceSplit> dorisSourceSplits = new ArrayList<>();
         List<PartitionDefinition> partitions =
                 RestService.findPartitions(options, readOptions, LOG);
-        partitions.forEach(m -> dorisSourceSplits.add(new 
DorisSourceSplit(m)));
+        for (int index = 0; index < partitions.size(); index++) {
+            PartitionDefinition partitionDef = partitions.get(index);
+            String splitId = partitionDef.getBeAddress() + "_" + index;
+            dorisSourceSplits.add(new DorisSourceSplit(splitId, partitionDef));
+        }
         DorisSplitAssigner splitAssigner = new 
SimpleSplitAssigner(dorisSourceSplits);
-
         return new DorisSourceEnumerator(context, splitAssigner);
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
index d0dcf9d7..ee96f687 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
@@ -19,6 +19,8 @@ package org.apache.doris.flink.source.assigners;
 
 import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint;
 import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -29,6 +31,7 @@ import java.util.Optional;
 /** The {@code SimpleSplitAssigner} hands out splits in a random order. */
 public class SimpleSplitAssigner implements DorisSplitAssigner {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(SimpleSplitAssigner.class);
     private final ArrayList<DorisSourceSplit> splits;
 
     public SimpleSplitAssigner(Collection<DorisSourceSplit> splits) {
@@ -43,6 +46,7 @@ public class SimpleSplitAssigner implements 
DorisSplitAssigner {
 
     @Override
     public void addSplits(Collection<DorisSourceSplit> splits) {
+        LOG.info("Adding splits: {}", splits);
         splits.addAll(splits);
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
index a034b3b0..65fcc6fa 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
@@ -79,7 +79,7 @@ public class DorisSourceEnumerator
 
     @Override
     public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) {
-        LOG.debug("Doris Source Enumerator adds splits back: {}", splits);
+        LOG.info("Doris Source Enumerator adds splits back: {}", splits);
         splitAssigner.addSplits(splits);
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java
index b4d60a23..f80d4165 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java
@@ -27,7 +27,7 @@ import java.util.Objects;
 
 /** A {@link SourceSplit} that represents a {@link PartitionDefinition}. */
 public class DorisSourceSplit implements SourceSplit {
-
+    private String id;
     private final PartitionDefinition partitionDefinition;
 
     /**
@@ -36,13 +36,14 @@ public class DorisSourceSplit implements SourceSplit {
      */
     @Nullable transient byte[] serializedFormCache;
 
-    public DorisSourceSplit(PartitionDefinition partitionDefinition) {
+    public DorisSourceSplit(String id, PartitionDefinition 
partitionDefinition) {
+        this.id = id;
         this.partitionDefinition = partitionDefinition;
     }
 
     @Override
     public String splitId() {
-        return partitionDefinition.getBeAddress();
+        return id;
     }
 
     public PartitionDefinition getPartitionDefinition() {
@@ -52,9 +53,10 @@ public class DorisSourceSplit implements SourceSplit {
     @Override
     public String toString() {
         return String.format(
-                "DorisSourceSplit: %s.%s,be=%s,tablets=%s",
+                "DorisSourceSplit: %s.%s,id=%s,be=%s,tablets=%s",
                 partitionDefinition.getDatabase(),
                 partitionDefinition.getTable(),
+                id,
                 partitionDefinition.getBeAddress(),
                 partitionDefinition.getTabletIds());
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
index 7e9468ec..cf7d27f6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
@@ -39,7 +39,7 @@ public class DorisSourceSplitSerializer implements 
SimpleVersionedSerializer<Dor
     private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
             ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
 
-    private static final int VERSION = 1;
+    private static final int VERSION = 2;
 
     private static void writeLongArray(DataOutputView out, Long[] values) 
throws IOException {
         out.writeInt(values.length);
@@ -71,6 +71,7 @@ public class DorisSourceSplitSerializer implements 
SimpleVersionedSerializer<Dor
         }
 
         final DataOutputSerializer out = SERIALIZER_CACHE.get();
+
         PartitionDefinition partDef = split.getPartitionDefinition();
         out.writeUTF(partDef.getDatabase());
         out.writeUTF(partDef.getTable());
@@ -81,6 +82,8 @@ public class DorisSourceSplitSerializer implements 
SimpleVersionedSerializer<Dor
         out.writeInt(queryPlanBytes.length);
         out.write(queryPlanBytes);
 
+        out.writeUTF(split.splitId());
+
         final byte[] result = out.getCopyOfBuffer();
         out.clear();
 
@@ -93,13 +96,16 @@ public class DorisSourceSplitSerializer implements 
SimpleVersionedSerializer<Dor
 
     @Override
     public DorisSourceSplit deserialize(int version, byte[] serialized) throws 
IOException {
-        if (version == 1) {
-            return deserialize(serialized);
+        switch (version) {
+            case 1:
+            case 2:
+                return deserializeSplit(version, serialized);
+            default:
+                throw new IOException("Unknown version: " + version);
         }
-        throw new IOException("Unknown version: " + version);
     }
 
-    private DorisSourceSplit deserialize(byte[] serialized) throws IOException 
{
+    private DorisSourceSplit deserializeSplit(int version, byte[] serialized) 
throws IOException {
         final DataInputDeserializer in = new DataInputDeserializer(serialized);
         final String database = in.readUTF();
         final String table = in.readUTF();
@@ -112,8 +118,14 @@ public class DorisSourceSplitSerializer implements 
SimpleVersionedSerializer<Dor
         final byte[] bytes = new byte[len];
         in.read(bytes);
         final String queryPlan = new String(bytes, StandardCharsets.UTF_8);
+
+        // read split id
+        String splitId = "splitId";
+        if (version >= 2) {
+            splitId = in.readUTF();
+        }
         PartitionDefinition partDef =
                 new PartitionDefinition(database, table, beAddress, tabletIds, 
queryPlan);
-        return new DorisSourceSplit(partDef);
+        return new DorisSourceSplit(splitId, partDef);
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java
index fa092796..4b7d315a 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java
@@ -35,7 +35,8 @@ public class PendingSplitsCheckpointSerializerTest {
 
     @Test
     public void serializeSplit() throws Exception {
-        final DorisSourceSplit split = new 
DorisSourceSplit(OptionUtils.buildPartitionDef());
+        final DorisSourceSplit split =
+                new DorisSourceSplit("splitId", 
OptionUtils.buildPartitionDef());
         PendingSplitsCheckpoint checkpoint = new 
PendingSplitsCheckpoint(Arrays.asList(split));
 
         final PendingSplitsCheckpointSerializer splitSerializer =
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
index 1fc3165c..f044b25e 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
@@ -40,7 +40,7 @@ public class DorisSourceReaderTest {
     }
 
     private static DorisSourceSplit createTestDorisSplit() throws IOException {
-        return new DorisSourceSplit(OptionUtils.buildPartitionDef());
+        return new DorisSourceSplit("splitId", 
OptionUtils.buildPartitionDef());
     }
 
     @Test
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java
index 0103ccb5..6fc721ce 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java
@@ -27,7 +27,8 @@ public class DorisSourceSplitSerializerTest {
 
     @Test
     public void serializeSplit() throws Exception {
-        final DorisSourceSplit split = new 
DorisSourceSplit(OptionUtils.buildPartitionDef());
+        final DorisSourceSplit split =
+                new DorisSourceSplit("splitId", 
OptionUtils.buildPartitionDef());
 
         DorisSourceSplit deSerialized = serializeAndDeserializeSplit(split);
         assertEquals(split, deSerialized);
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitTest.java
index 40db95a6..b633affc 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitTest.java
@@ -31,8 +31,8 @@ public class DorisSourceSplitTest {
                 new PartitionDefinition("db", "tbl", "be", new HashSet<>(), 
"queryplan1");
         PartitionDefinition pd2 =
                 new PartitionDefinition("db", "tbl", "be", new HashSet<>(), 
"queryplan1");
-        DorisSourceSplit split1 = new DorisSourceSplit(pd1);
-        DorisSourceSplit split2 = new DorisSourceSplit(pd2);
+        DorisSourceSplit split1 = new DorisSourceSplit("be_1", pd1);
+        DorisSourceSplit split2 = new DorisSourceSplit("be_2", pd2);
         Assert.assertEquals(split1, split2);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to