This is an automated email from the ASF dual-hosted git repository. kirs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 67c9eb16 [Improve] fix doris source duplicate splitid (#414) 67c9eb16 is described below commit 67c9eb16b6c3adcf24e95c0ad9f0b229fb40cdc2 Author: wudi <676366...@qq.com> AuthorDate: Wed Jul 3 14:21:52 2024 +0800 [Improve] fix doris source duplicate splitid (#414) * fix doris source duplicate splitid * code style --- .../org/apache/doris/flink/source/DorisSource.java | 7 +++++-- .../source/assigners/SimpleSplitAssigner.java | 4 ++++ .../source/enumerator/DorisSourceEnumerator.java | 2 +- .../doris/flink/source/split/DorisSourceSplit.java | 10 +++++---- .../source/split/DorisSourceSplitSerializer.java | 24 ++++++++++++++++------ .../PendingSplitsCheckpointSerializerTest.java | 3 ++- .../flink/source/reader/DorisSourceReaderTest.java | 2 +- .../split/DorisSourceSplitSerializerTest.java | 3 ++- .../flink/source/split/DorisSourceSplitTest.java | 4 ++-- 9 files changed, 41 insertions(+), 18 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java index 55436b07..3c71c068 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java @@ -97,9 +97,12 @@ public class DorisSource<OUT> List<DorisSourceSplit> dorisSourceSplits = new ArrayList<>(); List<PartitionDefinition> partitions = RestService.findPartitions(options, readOptions, LOG); - partitions.forEach(m -> dorisSourceSplits.add(new DorisSourceSplit(m))); + for (int index = 0; index < partitions.size(); index++) { + PartitionDefinition partitionDef = partitions.get(index); + String splitId = partitionDef.getBeAddress() + "_" + index; + dorisSourceSplits.add(new DorisSourceSplit(splitId, partitionDef)); + } DorisSplitAssigner splitAssigner = new SimpleSplitAssigner(dorisSourceSplits); - return new DorisSourceEnumerator(context, splitAssigner); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java index d0dcf9d7..ee96f687 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java @@ -19,6 +19,8 @@ package org.apache.doris.flink.source.assigners; import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint; import org.apache.doris.flink.source.split.DorisSourceSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -29,6 +31,7 @@ import java.util.Optional; /** The {@code SimpleSplitAssigner} hands out splits in a random order. */ public class SimpleSplitAssigner implements DorisSplitAssigner { + private static final Logger LOG = LoggerFactory.getLogger(SimpleSplitAssigner.class); private final ArrayList<DorisSourceSplit> splits; public SimpleSplitAssigner(Collection<DorisSourceSplit> splits) { @@ -43,6 +46,7 @@ public class SimpleSplitAssigner implements DorisSplitAssigner { @Override public void addSplits(Collection<DorisSourceSplit> splits) { + LOG.info("Adding splits: {}", splits); splits.addAll(splits); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java index a034b3b0..65fcc6fa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java @@ -79,7 +79,7 @@ public class DorisSourceEnumerator @Override public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) { - LOG.debug("Doris Source Enumerator adds splits back: {}", splits); + LOG.info("Doris Source Enumerator adds splits back: {}", splits); splitAssigner.addSplits(splits); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java index b4d60a23..f80d4165 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java @@ -27,7 +27,7 @@ import java.util.Objects; /** A {@link SourceSplit} that represents a {@link PartitionDefinition}. */ public class DorisSourceSplit implements SourceSplit { - + private String id; private final PartitionDefinition partitionDefinition; /** @@ -36,13 +36,14 @@ public class DorisSourceSplit implements SourceSplit { */ @Nullable transient byte[] serializedFormCache; - public DorisSourceSplit(PartitionDefinition partitionDefinition) { + public DorisSourceSplit(String id, PartitionDefinition partitionDefinition) { + this.id = id; this.partitionDefinition = partitionDefinition; } @Override public String splitId() { - return partitionDefinition.getBeAddress(); + return id; } public PartitionDefinition getPartitionDefinition() { @@ -52,9 +53,10 @@ public class DorisSourceSplit implements SourceSplit { @Override public String toString() { return String.format( - "DorisSourceSplit: %s.%s,be=%s,tablets=%s", + "DorisSourceSplit: %s.%s,id=%s,be=%s,tablets=%s", partitionDefinition.getDatabase(), partitionDefinition.getTable(), + id, partitionDefinition.getBeAddress(), partitionDefinition.getTabletIds()); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java index 7e9468ec..cf7d27f6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java @@ -39,7 +39,7 @@ public class DorisSourceSplitSerializer implements SimpleVersionedSerializer<Dor private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); - private static final int VERSION = 1; + private static final int VERSION = 2; private static void writeLongArray(DataOutputView out, Long[] values) throws IOException { out.writeInt(values.length); @@ -71,6 +71,7 @@ public class DorisSourceSplitSerializer implements SimpleVersionedSerializer<Dor } final DataOutputSerializer out = SERIALIZER_CACHE.get(); + PartitionDefinition partDef = split.getPartitionDefinition(); out.writeUTF(partDef.getDatabase()); out.writeUTF(partDef.getTable()); @@ -81,6 +82,8 @@ public class DorisSourceSplitSerializer implements SimpleVersionedSerializer<Dor out.writeInt(queryPlanBytes.length); out.write(queryPlanBytes); + out.writeUTF(split.splitId()); + final byte[] result = out.getCopyOfBuffer(); out.clear(); @@ -93,13 +96,16 @@ public class DorisSourceSplitSerializer implements SimpleVersionedSerializer<Dor @Override public DorisSourceSplit deserialize(int version, byte[] serialized) throws IOException { - if (version == 1) { - return deserialize(serialized); + switch (version) { + case 1: + case 2: + return deserializeSplit(version, serialized); + default: + throw new IOException("Unknown version: " + version); } - throw new IOException("Unknown version: " + version); } - private DorisSourceSplit deserialize(byte[] serialized) throws IOException { + private DorisSourceSplit deserializeSplit(int version, byte[] serialized) throws IOException { final DataInputDeserializer in = new DataInputDeserializer(serialized); final String database = in.readUTF(); final String table = in.readUTF(); @@ -112,8 +118,14 @@ public class DorisSourceSplitSerializer implements SimpleVersionedSerializer<Dor final byte[] bytes = new byte[len]; in.read(bytes); final String queryPlan = new String(bytes, StandardCharsets.UTF_8); + + // read split id + String splitId = "splitId"; + if (version >= 2) { + splitId = in.readUTF(); + } PartitionDefinition partDef = new PartitionDefinition(database, table, beAddress, tabletIds, queryPlan); - return new DorisSourceSplit(partDef); + return new DorisSourceSplit(splitId, partDef); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java index fa092796..4b7d315a 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java @@ -35,7 +35,8 @@ public class PendingSplitsCheckpointSerializerTest { @Test public void serializeSplit() throws Exception { - final DorisSourceSplit split = new DorisSourceSplit(OptionUtils.buildPartitionDef()); + final DorisSourceSplit split = + new DorisSourceSplit("splitId", OptionUtils.buildPartitionDef()); PendingSplitsCheckpoint checkpoint = new PendingSplitsCheckpoint(Arrays.asList(split)); final PendingSplitsCheckpointSerializer splitSerializer = diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java index 1fc3165c..f044b25e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java @@ -40,7 +40,7 @@ public class DorisSourceReaderTest { } private static DorisSourceSplit createTestDorisSplit() throws IOException { - return new DorisSourceSplit(OptionUtils.buildPartitionDef()); + return new DorisSourceSplit("splitId", OptionUtils.buildPartitionDef()); } @Test diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java index 0103ccb5..6fc721ce 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java @@ -27,7 +27,8 @@ public class DorisSourceSplitSerializerTest { @Test public void serializeSplit() throws Exception { - final DorisSourceSplit split = new DorisSourceSplit(OptionUtils.buildPartitionDef()); + final DorisSourceSplit split = + new DorisSourceSplit("splitId", OptionUtils.buildPartitionDef()); DorisSourceSplit deSerialized = serializeAndDeserializeSplit(split); assertEquals(split, deSerialized); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitTest.java index 40db95a6..b633affc 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitTest.java @@ -31,8 +31,8 @@ public class DorisSourceSplitTest { new PartitionDefinition("db", "tbl", "be", new HashSet<>(), "queryplan1"); PartitionDefinition pd2 = new PartitionDefinition("db", "tbl", "be", new HashSet<>(), "queryplan1"); - DorisSourceSplit split1 = new DorisSourceSplit(pd1); - DorisSourceSplit split2 = new DorisSourceSplit(pd2); + DorisSourceSplit split1 = new DorisSourceSplit("be_1", pd1); + DorisSourceSplit split2 = new DorisSourceSplit("be_2", pd2); Assert.assertEquals(split1, split2); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org