leekeiabstraction commented on code in PR #198: URL: https://github.com/apache/flink-connector-aws/pull/198#discussion_r2039893322
########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java: ########## @@ -58,15 +65,67 @@ public KinesisStreamsSourceReader( Map<String, KinesisShardMetrics> shardMetricGroupMap) { super(splitFetcherManager, recordEmitter, config, context); this.shardMetricGroupMap = shardMetricGroupMap; + this.splitFinishedEvents = new TreeMap<>(); + this.currentCheckpointId = Long.MIN_VALUE; } @Override protected void onSplitFinished(Map<String, KinesisShardSplitState> finishedSplitIds) { + if (finishedSplitIds.isEmpty()) { + return; + } + splitFinishedEvents.computeIfAbsent(currentCheckpointId, k -> new HashSet<>()); + finishedSplitIds.values().stream() + .map( + finishedSplit -> + new KinesisShardSplit( + finishedSplit.getStreamArn(), + finishedSplit.getShardId(), + finishedSplit.getNextStartingPosition(), + finishedSplit.getKinesisShardSplit().getParentShardIds(), + finishedSplit.getKinesisShardSplit().getStartingHashKey(), + finishedSplit.getKinesisShardSplit().getEndingHashKey(), + true)) + .forEach(split -> splitFinishedEvents.get(currentCheckpointId).add(split)); + context.sendSourceEventToCoordinator( new SplitsFinishedEvent(new HashSet<>(finishedSplitIds.keySet()))); finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup); } + /** + * At snapshot, we also store the pending finished split ids in the current checkpoint so that + * in case we have to restore the reader from state, we also send the finished split ids + * otherwise we run a risk of data loss during restarts of the source because of the + * SplitsFinishedEvent going missing. + * + * @param checkpointId the checkpoint id + * @return a list of finished splits + */ + @Override + public List<KinesisShardSplit> snapshotState(long checkpointId) { + this.currentCheckpointId = checkpointId; + ArrayList<KinesisShardSplit> splits = new ArrayList<>(super.snapshotState(checkpointId)); Review Comment: Let's instantiate variable with List interface instead of concrete ArrayList type ``` List<KinesisShardSplit> splits = new ArrayList<>(super.snapshotState(checkpointId)); ``` ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java: ########## @@ -137,16 +137,26 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } } - private void handleFinishedSplits(int subtask, SplitsFinishedEvent splitsFinishedEvent) { + private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); - splitAssignment - .get(subtask) - .removeIf( - split -> - splitsFinishedEvent - .getFinishedSplitIds() - .contains(split.splitId())); + Set<KinesisShardSplit> splitsAssignment = splitAssignment.get(subtaskId); + // during recovery, splitAssignment may return null since there might be no split assigned + // to the subtask, but there might be SplitsFinishedEvent from that subtask. + // We will not do child shard assignment if that is the case since that might lead to child + // shards trying to get assigned before there being any readers. + if (splitsAssignment == null) { + LOG.info( + "handleFinishedSplits called for subtask: {} which doesnt have any " Review Comment: nit: `doesn't` ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java: ########## @@ -79,8 +138,23 @@ protected KinesisShardSplit toSplitType(String splitId, KinesisShardSplitState s @Override public void addSplits(List<KinesisShardSplit> splits) { - splits.forEach(this::registerShardMetricGroup); - super.addSplits(splits); + List<KinesisShardSplit> kinesisShardSplits = new ArrayList<>(); + for (KinesisShardSplit split : splits) { + if (split.isFinished()) { + // Replay the finished split event. + // We don't need to reload the split finished events in buffer back + // since if the next checkpoint completes, these would just be removed from the + // buffer. If the next checkpoint doesn't complete, + // we would go back to the previous checkpointed + // state which will again replay these split finished events. Review Comment: These comments do not seem necessary. ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java: ########## @@ -147,4 +151,120 @@ void testAddSplitsRegistersAndUpdatesShardMetricGroup() throws Exception { TestUtil.assertMillisBehindLatest( split, TestUtil.MILLIS_BEHIND_LATEST_TEST_VALUE, metricListener); } + + @Test + void testSnapshotStateWithFinishedSplits() throws Exception { + // Create and add a split + KinesisShardSplit split = getTestSplit(); + List<KinesisShardSplit> splits = Collections.singletonList(split); + sourceReader.addSplits(splits); + + // Set checkpoint ID by taking initial snapshot + List<KinesisShardSplit> initialSnapshot = sourceReader.snapshotState(1L); + assertThat(initialSnapshot).hasSize(1).containsExactly(split); + + // Simulate split finishing + Map<String, KinesisShardSplitState> finishedSplits = new HashMap<>(); + finishedSplits.put(split.splitId(), new KinesisShardSplitState(split)); + sourceReader.onSplitFinished(finishedSplits); + + // Take another snapshot + List<KinesisShardSplit> snapshotSplits = sourceReader.snapshotState(2L); + List<KinesisShardSplit> snapshotFinishedSplits = + snapshotSplits.stream() + .filter(KinesisShardSplit::isFinished) + .collect(Collectors.toList()); + // Verify we have 2 splits - the original split and the finished split + assertThat(snapshotSplits).hasSize(2); + assertThat(snapshotFinishedSplits) + .hasSize(1) + .allSatisfy( + s -> { + assertThat(s.splitId()).isEqualTo(split.splitId()); + }); + } + + @Test + void testAddSplitsWithStateRestoration() throws Exception { + KinesisShardSplit finishedSplit1 = getFinishedTestSplit("finished-split-1"); + KinesisShardSplit finishedSplit2 = getFinishedTestSplit("finished-split-2"); + + // Create active split + KinesisShardSplit activeSplit = getTestSplit(); + + List<KinesisShardSplit> allSplits = + Arrays.asList(finishedSplit1, finishedSplit2, activeSplit); + + // Clear any previous events + testingReaderContext.clearSentEvents(); + + // Add splits + sourceReader.addSplits(allSplits); + + // Verify finished events were sent + List<SourceEvent> events = testingReaderContext.getSentEvents(); + assertThat(events) + .hasSize(2) + .allMatch(e -> e instanceof SplitsFinishedEvent) + .allSatisfy( + e -> { + SplitsFinishedEvent event = (SplitsFinishedEvent) e; + assertThat(event.getFinishedSplitIds()).hasSize(1); + assertThat(event.getFinishedSplitIds()) + .containsAnyOf("finished-split-1", "finished-split-2"); Review Comment: Why do we assert size one but also assert contains any of 2 split ids here? Can we be more specific and assert that one of them is exactly "finished-split-1" and another "finished-split-2"? Otherwise, the test case may not catch regression in the case where one of the finished split is duplicated and the other is dropped. ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplit.java: ########## @@ -44,6 +44,7 @@ public final class KinesisShardSplit implements SourceSplit { private final Set<String> parentShardIds; private final String startingHashKey; private final String endingHashKey; + private final boolean isFinished; Review Comment: nit: `finished` instead of `isFinished` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org