Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r146741125 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -322,22 +322,25 @@ private void writeAndFlushNextMessageIfPossible(Channel channel) { while (true) { RemoteInputChannel inputChannel = inputChannelsWithCredit.poll(); - // The input channel may be null because of the write callbacks that are executed - // after each write, and it is also no need to notify credit for released channel. - if (inputChannel == null || inputChannel.isReleased()) { + // The input channel may be null because of the write callbacks + // that are executed after each write. + if (inputChannel == null) { return; } - AddCredit msg = new AddCredit( - inputChannel.getPartitionId(), - inputChannel.getAndResetCredit(), - inputChannel.getInputChannelId()); + //It is no need to notify credit for the released channel. + if (!inputChannel.isReleased()) { + AddCredit msg = new AddCredit( + inputChannel.getPartitionId(), + inputChannel.getAndResetCredit(), + inputChannel.getInputChannelId()); - // Write and flush and wait until this is done before - // trying to continue with the next input channel. - channel.writeAndFlush(msg).addListener(writeListener); + // Write and flush and wait until this is done before + // trying to continue with the next input channel. + channel.writeAndFlush(msg).addListener(writeListener); - return; + return; --- End diff -- Yes, the current mechanism is the same with the upstream writing `BufferResponse`. We loop the `inputChannelsWithCredit` to find the first un-released `InputChannel`, then call `writeAndFlush` to break this loop. The remaining `InputChannel` s in the queue will be continued after `writeAndFlush` is done to callback `writeAndFlushNextMessageIfPossible`. I think it is mainly because we have to verify the channel is still writable before loop the next `InputChannel`. So we have to wait the current `writeAndFlush` done then check the channel writable to continue the next `InputChannel`. The current comment on this code `channel.writeAndFlush(msg).addListener(writeListener)` would be helpful for this explanation.
---