Github user fmthoma commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r189432840 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -218,6 +232,8 @@ public void invoke(OUT value, Context context) throws Exception { throw new RuntimeException("Kinesis producer has been closed"); } + checkAndPropagateAsyncError(); + checkQueueLimit(); checkAndPropagateAsyncError(); --- End diff -- `snapshotState()` also checks twice explicitly, and I think it makes sense to have the two checks on the same level. But I won't insist on that, if you prefer having it more implicitly.
---