Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152961281 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) { exclusiveBuffers.add(buffer); } - Buffer takeExclusiveBuffer() { - return exclusiveBuffers.poll(); - } - void addFloatingBuffer(Buffer buffer) { floatingBuffers.add(buffer); } - Buffer takeFloatingBuffer() { - return floatingBuffers.poll(); + /** + * Add the exclusive buffer into the queue, and recycle one floating buffer if the + * number of available buffers in queue is more than required amount. + * + * @param buffer The exclusive buffer of this channel. + * @return Whether to recycle one floating buffer. + */ + boolean maintainTargetSize(Buffer buffer) { --- End diff -- Sorry about the forth-and-back here, but thinking about the bug that you fixed with the latest commit, it would be dangerous if we ever called `maintainTargetSize()` without adding an exclusive buffer. What do you think about my second suggestion instead, i.e. having a ``` /** * Adds an exclusive buffer (back) into the queue and recycles one floating buffer if the * number of available buffers in queue is more than the required amount. * * @param buffer * the exclusive buffer to add * @param numRequiredBuffers * the number of required buffers * * @return how many buffers were added to the queue */ int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) { exclusiveBuffers.add(buffer); if (getAvailableBufferSize() > numRequiredBuffers) { Buffer floatingBuffer = floatingBuffers.poll(); floatingBuffer.recycle(); return 0; } else { return 1; } } ``` (please note the changed return type which I think is more obvious than a `boolean`)
---