Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r153401374 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -283,10 +283,13 @@ public String toString() { // ------------------------------------------------------------------------ /** - * Enqueue this input channel in the pipeline for sending unannounced credits to producer. + * Enqueue this input channel in the pipeline for notifying the producer of unannounced credit. */ void notifyCreditAvailable() { - //TODO in next PR + // We should skip the notification if this channel is already released. + if (!isReleased.get() && partitionRequestClient != null) { --- End diff -- In my early implementation, I checked the client state as above. Now I change the current way based on two considerations: 1. The credit is always announced after receiving the sender's backlog, so the partition request client should already be initialized before. 2. The check state here only wants to avoid unnecessary announcement after the client is closed because of task failure or network exception. In this case, the closed client is in expectation and we do not want to throw disturbed messages by `checkState`.
---