leekeiabstraction commented on code in PR #198:
URL: 
https://github.com/apache/flink-connector-aws/pull/198#discussion_r2039893322


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java:
##########
@@ -58,15 +65,67 @@ public KinesisStreamsSourceReader(
             Map<String, KinesisShardMetrics> shardMetricGroupMap) {
         super(splitFetcherManager, recordEmitter, config, context);
         this.shardMetricGroupMap = shardMetricGroupMap;
+        this.splitFinishedEvents = new TreeMap<>();
+        this.currentCheckpointId = Long.MIN_VALUE;
     }
 
     @Override
     protected void onSplitFinished(Map<String, KinesisShardSplitState> 
finishedSplitIds) {
+        if (finishedSplitIds.isEmpty()) {
+            return;
+        }
+        splitFinishedEvents.computeIfAbsent(currentCheckpointId, k -> new 
HashSet<>());
+        finishedSplitIds.values().stream()
+                .map(
+                        finishedSplit ->
+                                new KinesisShardSplit(
+                                        finishedSplit.getStreamArn(),
+                                        finishedSplit.getShardId(),
+                                        
finishedSplit.getNextStartingPosition(),
+                                        
finishedSplit.getKinesisShardSplit().getParentShardIds(),
+                                        
finishedSplit.getKinesisShardSplit().getStartingHashKey(),
+                                        
finishedSplit.getKinesisShardSplit().getEndingHashKey(),
+                                        true))
+                .forEach(split -> 
splitFinishedEvents.get(currentCheckpointId).add(split));
+
         context.sendSourceEventToCoordinator(
                 new SplitsFinishedEvent(new 
HashSet<>(finishedSplitIds.keySet())));
         finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup);
     }
 
+    /**
+     * At snapshot, we also store the pending finished split ids in the 
current checkpoint so that
+     * in case we have to restore the reader from state, we also send the 
finished split ids
+     * otherwise we run a risk of data loss during restarts of the source 
because of the
+     * SplitsFinishedEvent going missing.
+     *
+     * @param checkpointId the checkpoint id
+     * @return a list of finished splits
+     */
+    @Override
+    public List<KinesisShardSplit> snapshotState(long checkpointId) {
+        this.currentCheckpointId = checkpointId;
+        ArrayList<KinesisShardSplit> splits = new 
ArrayList<>(super.snapshotState(checkpointId));

Review Comment:
   Let's instantiate variable with List interface instead of concrete ArrayList 
type
   
   ```
   List<KinesisShardSplit> splits = new 
ArrayList<>(super.snapshotState(checkpointId));
   ```



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##########
@@ -137,16 +137,26 @@ public void handleSourceEvent(int subtaskId, SourceEvent 
sourceEvent) {
         }
     }
 
-    private void handleFinishedSplits(int subtask, SplitsFinishedEvent 
splitsFinishedEvent) {
+    private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
         splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
-        splitAssignment
-                .get(subtask)
-                .removeIf(
-                        split ->
-                                splitsFinishedEvent
-                                        .getFinishedSplitIds()
-                                        .contains(split.splitId()));
+        Set<KinesisShardSplit> splitsAssignment = 
splitAssignment.get(subtaskId);
+        // during recovery, splitAssignment may return null since there might 
be no split assigned
+        // to the subtask, but there might be SplitsFinishedEvent from that 
subtask.
+        // We will not do child shard assignment if that is the case since 
that might lead to child
+        // shards trying to get assigned before there being any readers.
+        if (splitsAssignment == null) {
+            LOG.info(
+                    "handleFinishedSplits called for subtask: {} which doesnt 
have any "

Review Comment:
   nit: `doesn't`



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java:
##########
@@ -79,8 +138,23 @@ protected KinesisShardSplit toSplitType(String splitId, 
KinesisShardSplitState s
 
     @Override
     public void addSplits(List<KinesisShardSplit> splits) {
-        splits.forEach(this::registerShardMetricGroup);
-        super.addSplits(splits);
+        List<KinesisShardSplit> kinesisShardSplits = new ArrayList<>();
+        for (KinesisShardSplit split : splits) {
+            if (split.isFinished()) {
+                // Replay the finished split event.
+                // We don't need to reload the split finished events in buffer 
back
+                // since if the next checkpoint completes, these would just be 
removed from the
+                // buffer. If the next checkpoint doesn't complete,
+                // we would go back to the previous checkpointed
+                // state which will again replay these split finished events.

Review Comment:
   These comments do not seem necessary.



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java:
##########
@@ -147,4 +151,120 @@ void testAddSplitsRegistersAndUpdatesShardMetricGroup() 
throws Exception {
         TestUtil.assertMillisBehindLatest(
                 split, TestUtil.MILLIS_BEHIND_LATEST_TEST_VALUE, 
metricListener);
     }
+
+    @Test
+    void testSnapshotStateWithFinishedSplits() throws Exception {
+        // Create and add a split
+        KinesisShardSplit split = getTestSplit();
+        List<KinesisShardSplit> splits = Collections.singletonList(split);
+        sourceReader.addSplits(splits);
+
+        // Set checkpoint ID by taking initial snapshot
+        List<KinesisShardSplit> initialSnapshot = 
sourceReader.snapshotState(1L);
+        assertThat(initialSnapshot).hasSize(1).containsExactly(split);
+
+        // Simulate split finishing
+        Map<String, KinesisShardSplitState> finishedSplits = new HashMap<>();
+        finishedSplits.put(split.splitId(), new KinesisShardSplitState(split));
+        sourceReader.onSplitFinished(finishedSplits);
+
+        // Take another snapshot
+        List<KinesisShardSplit> snapshotSplits = 
sourceReader.snapshotState(2L);
+        List<KinesisShardSplit> snapshotFinishedSplits =
+                snapshotSplits.stream()
+                        .filter(KinesisShardSplit::isFinished)
+                        .collect(Collectors.toList());
+        // Verify we have 2 splits - the original split and the finished split
+        assertThat(snapshotSplits).hasSize(2);
+        assertThat(snapshotFinishedSplits)
+                .hasSize(1)
+                .allSatisfy(
+                        s -> {
+                            assertThat(s.splitId()).isEqualTo(split.splitId());
+                        });
+    }
+
+    @Test
+    void testAddSplitsWithStateRestoration() throws Exception {
+        KinesisShardSplit finishedSplit1 = 
getFinishedTestSplit("finished-split-1");
+        KinesisShardSplit finishedSplit2 = 
getFinishedTestSplit("finished-split-2");
+
+        // Create active split
+        KinesisShardSplit activeSplit = getTestSplit();
+
+        List<KinesisShardSplit> allSplits =
+                Arrays.asList(finishedSplit1, finishedSplit2, activeSplit);
+
+        // Clear any previous events
+        testingReaderContext.clearSentEvents();
+
+        // Add splits
+        sourceReader.addSplits(allSplits);
+
+        // Verify finished events were sent
+        List<SourceEvent> events = testingReaderContext.getSentEvents();
+        assertThat(events)
+                .hasSize(2)
+                .allMatch(e -> e instanceof SplitsFinishedEvent)
+                .allSatisfy(
+                        e -> {
+                            SplitsFinishedEvent event = (SplitsFinishedEvent) 
e;
+                            assertThat(event.getFinishedSplitIds()).hasSize(1);
+                            assertThat(event.getFinishedSplitIds())
+                                    .containsAnyOf("finished-split-1", 
"finished-split-2");

Review Comment:
   Why do we assert size one but also assert contains any of 2 split ids here? 
Can we be more specific and assert that one of them is exactly 
"finished-split-1" and  another "finished-split-2"? Otherwise, the test case 
may not catch regression in the case where one of the finished split is 
duplicated and the other is dropped.



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java:
##########
@@ -44,6 +44,7 @@ public final class KinesisShardSplit implements SourceSplit {
     private final Set<String> parentShardIds;
     private final String startingHashKey;
     private final String endingHashKey;
+    private final boolean isFinished;

Review Comment:
   nit: `finished` instead of `isFinished`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to