leekeiabstraction commented on code in PR #198: URL: https://github.com/apache/flink-connector-aws/pull/198#discussion_r2093275543
########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java: ########## @@ -49,6 +54,8 @@ public class KinesisStreamsSourceReader<T> private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceReader.class); private final Map<String, KinesisShardMetrics> shardMetricGroupMap; + private final NavigableMap<Long, Set<KinesisShardSplit>> splitFinishedEvents; Review Comment: Instead of `splitFinishedEvents`, can we call this `finishedSplits` as these are just KinesisShardSplit object with finished set to true? This is so that it's consistent with the class name `KinesisShardSplit`. In SourceReaderBase, these are referred to as splits. SplitFinishedEvent is an entirely different pojo. ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java: ########## @@ -117,6 +117,54 @@ void testDeserializeWrongVersion() throws Exception { .withMessageContaining(String.valueOf(wrongVersionSerializer.getVersion())); } + @Test + void testSerializeAndDeserializeWithFinishedSplits() throws Exception { + final KinesisShardSplit initialSplit = + new KinesisShardSplit( + STREAM_ARN, + generateShardId(10), + StartingPosition.continueFromSequenceNumber("some-sequence-number"), + new HashSet<>(Arrays.asList(generateShardId(2), generateShardId(5))), + STARTING_HASH_KEY_TEST_VALUE, + ENDING_HASH_KEY_TEST_VALUE, + true); + + KinesisShardSplitSerializer serializer = new KinesisShardSplitSerializer(); + + byte[] serialized = serializer.serialize(initialSplit); + KinesisShardSplit deserializedSplit = + serializer.deserialize(serializer.getVersion(), serialized); + + assertThat(deserializedSplit).usingRecursiveComparison().isEqualTo(initialSplit); + assertThat(deserializedSplit.isFinished()).isTrue(); + } + + @Test + void testDeserializeVersion1() throws Exception { + final KinesisShardSplitSerializer serializer = new KinesisShardSplitSerializer(); + + final KinesisShardSplit initialSplit = + new KinesisShardSplit( + STREAM_ARN, + generateShardId(10), + StartingPosition.continueFromSequenceNumber("some-sequence-number"), + new HashSet<>(Arrays.asList(generateShardId(2), generateShardId(5))), + STARTING_HASH_KEY_TEST_VALUE, + ENDING_HASH_KEY_TEST_VALUE, + true); Review Comment: Let's use the constructor without the `finished` argument. ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java: ########## @@ -79,8 +138,17 @@ protected KinesisShardSplit toSplitType(String splitId, KinesisShardSplitState s @Override public void addSplits(List<KinesisShardSplit> splits) { - splits.forEach(this::registerShardMetricGroup); - super.addSplits(splits); + List<KinesisShardSplit> kinesisShardSplits = new ArrayList<>(); Review Comment: nit: We can name this `unfinishedSplits` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org