dannycranmer commented on code in PR #21102:
URL: https://github.com/apache/flink/pull/21102#discussion_r1009223130


##########
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxy.java:
##########
@@ -110,6 +111,24 @@ public GetShardListResult getShardList(Map<String, String> 
streamNamesWithLastSe
         return result;
     }
 
+    @Override
+    public String getShardIterator(
+            StreamShardHandle shard, String shardIteratorType, @Nullable 
Object startingMarker)
+            throws InterruptedException {
+        try {
+            return super.getShardIterator(shard, shardIteratorType, 
startingMarker);
+        } catch (ResourceNotFoundException re) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info(
+                        "Received ResourceNotFoundException. "
+                                + "Shard {} of stream {} is no longer valid, 
marking it as complete.",
+                        shard.getShard().getShardId(),
+                        shard.getStreamName());
+            }
+            return null;
+        }
+    }
+

Review Comment:
   Thanks for the contribution @elphastori. What is the failure mode difference 
from before until now if the user deletes a stream/table? Is there a 
possibility for regression or are DDB shard names typically unique?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to