leekeiabstraction commented on code in PR #198:
URL: 
https://github.com/apache/flink-connector-aws/pull/198#discussion_r2093275543


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java:
##########
@@ -49,6 +54,8 @@ public class KinesisStreamsSourceReader<T>
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceReader.class);
     private final Map<String, KinesisShardMetrics> shardMetricGroupMap;
+    private final NavigableMap<Long, Set<KinesisShardSplit>> 
splitFinishedEvents;

Review Comment:
   Instead of `splitFinishedEvents`, can we call this `finishedSplits` as these 
are just KinesisShardSplit object with finished set to true? This is so that 
it's consistent with the class name `KinesisShardSplit`. In SourceReaderBase, 
these are referred to as splits.
   
   SplitFinishedEvent is an entirely different pojo.



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializerTest.java:
##########
@@ -117,6 +117,54 @@ void testDeserializeWrongVersion() throws Exception {
                 
.withMessageContaining(String.valueOf(wrongVersionSerializer.getVersion()));
     }
 
+    @Test
+    void testSerializeAndDeserializeWithFinishedSplits() throws Exception {
+        final KinesisShardSplit initialSplit =
+                new KinesisShardSplit(
+                        STREAM_ARN,
+                        generateShardId(10),
+                        
StartingPosition.continueFromSequenceNumber("some-sequence-number"),
+                        new HashSet<>(Arrays.asList(generateShardId(2), 
generateShardId(5))),
+                        STARTING_HASH_KEY_TEST_VALUE,
+                        ENDING_HASH_KEY_TEST_VALUE,
+                        true);
+
+        KinesisShardSplitSerializer serializer = new 
KinesisShardSplitSerializer();
+
+        byte[] serialized = serializer.serialize(initialSplit);
+        KinesisShardSplit deserializedSplit =
+                serializer.deserialize(serializer.getVersion(), serialized);
+
+        
assertThat(deserializedSplit).usingRecursiveComparison().isEqualTo(initialSplit);
+        assertThat(deserializedSplit.isFinished()).isTrue();
+    }
+
+    @Test
+    void testDeserializeVersion1() throws Exception {
+        final KinesisShardSplitSerializer serializer = new 
KinesisShardSplitSerializer();
+
+        final KinesisShardSplit initialSplit =
+                new KinesisShardSplit(
+                        STREAM_ARN,
+                        generateShardId(10),
+                        
StartingPosition.continueFromSequenceNumber("some-sequence-number"),
+                        new HashSet<>(Arrays.asList(generateShardId(2), 
generateShardId(5))),
+                        STARTING_HASH_KEY_TEST_VALUE,
+                        ENDING_HASH_KEY_TEST_VALUE,
+                        true);

Review Comment:
   Let's use the constructor without the `finished` argument.



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java:
##########
@@ -79,8 +138,17 @@ protected KinesisShardSplit toSplitType(String splitId, 
KinesisShardSplitState s
 
     @Override
     public void addSplits(List<KinesisShardSplit> splits) {
-        splits.forEach(this::registerShardMetricGroup);
-        super.addSplits(splits);
+        List<KinesisShardSplit> kinesisShardSplits = new ArrayList<>();

Review Comment:
   nit: We can name this `unfinishedSplits`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to