Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/6238#discussion_r199676237 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -147,7 +151,12 @@ public void recycle(MemorySegment segment) { this.numTotalRequiredBuffers += numRequiredBuffers; - redistributeBuffers(); + try { + redistributeBuffers(); + } catch (Throwable t) { + this.numTotalRequiredBuffers -= numRequiredBuffers; + ExceptionUtils.rethrowIOException(t); + } } final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers); --- End diff -- The following `availableMemorySegments.poll(2, TimeUnit.SECONDS) ` may cause `InterruptedException`, and in the catch part `recycleMemorySegments(segments)` it will do `numTotalRequiredBuffers -= segments.size();`. I think we should do `recycleMemorySegments(numRequiredBuffers ,segments)`, and then call `numTotalRequiredBuffers -= numRequiredBuffers;` inside it, otherwise the `numTotalRequiredBuffers` is leaked.
---