dannycranmer commented on code in PR #21102: URL: https://github.com/apache/flink/pull/21102#discussion_r1009223130
########## flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxy.java: ########## @@ -110,6 +111,24 @@ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSe return result; } + @Override + public String getShardIterator( + StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker) + throws InterruptedException { + try { + return super.getShardIterator(shard, shardIteratorType, startingMarker); + } catch (ResourceNotFoundException re) { + if (LOG.isInfoEnabled()) { + LOG.info( + "Received ResourceNotFoundException. " + + "Shard {} of stream {} is no longer valid, marking it as complete.", + shard.getShard().getShardId(), + shard.getStreamName()); + } + return null; + } + } + Review Comment: Thanks for the contribution @elphastori. What is the failure mode difference from before until now if the user deletes a stream/table? Is there a possibility for regression or are DDB shard names typically unique? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org