Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141945644 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -131,6 +136,63 @@ public void recycle(MemorySegment segment) { availableMemorySegments.add(segment); } + public List<MemorySegment> requestMemorySegments(int numRequiredBuffers) throws IOException { + checkArgument(numRequiredBuffers > 0, "The number of required buffers should be larger than 0."); + + synchronized (factoryLock) { + if (isDestroyed) { + throw new IllegalStateException("Network buffer pool has already been destroyed."); + } + + if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) { + throw new IOException(String.format("Insufficient number of network buffers: " + --- End diff -- How about adding a `FlinkNetworkException` or `FlinkNetworkIOException` for all network/netty related exceptions? That can help users better identify root causes of problems.
---