Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5105#discussion_r156923552 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java --- @@ -40,37 +39,31 @@ private final BufferRecycler bufferRecycler; - private AtomicInteger numberOfCreatedBuffers = new AtomicInteger(); + private final int poolSize; - public TestBufferFactory() { - this(BUFFER_SIZE, RECYCLER); - } - - public TestBufferFactory(int bufferSize) { - this(bufferSize, RECYCLER); - } + private int numberOfCreatedBuffers = 0; - public TestBufferFactory(int bufferSize, BufferRecycler bufferRecycler) { + public TestBufferFactory(int poolSize, int bufferSize, BufferRecycler bufferRecycler) { checkArgument(bufferSize > 0); + this.poolSize = poolSize; this.bufferSize = bufferSize; this.bufferRecycler = checkNotNull(bufferRecycler); } - public Buffer create() { - numberOfCreatedBuffers.incrementAndGet(); + public synchronized Buffer create() { + if (numberOfCreatedBuffers >= poolSize) { + return null; + } + numberOfCreatedBuffers++; return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), bufferRecycler); } - public Buffer createFrom(MemorySegment segment) { - return new Buffer(segment, bufferRecycler); - } - - public int getNumberOfCreatedBuffers() { - return numberOfCreatedBuffers.get(); + public synchronized int getNumberOfCreatedBuffers() { + return numberOfCreatedBuffers; } - public int getBufferSize() { + public synchronized int getBufferSize() { --- End diff -- I would leave it just for the sake of having all methods `synchronized` so that you don't have to think which one are and which one should be `synchronized` (when adding features or refactoring this class in the future)
---