Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152859775 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel( initialAndMaxRequestBackoff._2(), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); } + + private Callable recycleExclusiveBufferTask(RemoteInputChannel inputChannel, int numExclusiveSegments) { + final List<Buffer> exclusiveBuffers = new ArrayList<>(numExclusiveSegments); + // Exhaust all the exclusive buffers + for (int i = 0; i < numExclusiveSegments; i++) { + Buffer buffer = inputChannel.requestBuffer(); + assertNotNull(buffer); + exclusiveBuffers.add(buffer); + } + + return new Callable<Void>() { + @Override + public Void call() throws Exception { + for (Buffer buffer : exclusiveBuffers) { + buffer.recycle(); + } + + return null; + } + }; + } + + private Callable recycleFloatingBufferTask(BufferPool bufferPool, int numFloatingBuffers) throws Exception { + final List<Buffer> floatingBuffers = new ArrayList<>(numFloatingBuffers); --- End diff -- please add a Javadoc
---