Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r189163394 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -218,6 +232,8 @@ public void invoke(OUT value, Context context) throws Exception { throw new RuntimeException("Kinesis producer has been closed"); } + checkAndPropagateAsyncError(); + checkQueueLimit(); checkAndPropagateAsyncError(); --- End diff -- This second check is to check any async errors that occurred during the queue flush, correct? If so, we should probably move this second invocation into `checkQueueLimit` to make this more implicit.
---