pnowojski commented on a change in pull request #15885: URL: https://github.com/apache/flink/pull/15885#discussion_r642258172
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/recovery/DemultiplexingRecordDeserializerTest.java ########## @@ -190,10 +196,12 @@ public void testWatermarks() throws IOException { deserializer.getVirtualChannelSelectors().iterator(); iterator.hasNext(); ) { SubtaskConnectionDescriptor selector = iterator.next(); - final BufferBuilder bufferBuilder = createBufferBuilder(128); + MemorySegment memorySegment = allocateUnpooledSegment(128); + final BufferBuilder bufferBuilder = createBufferBuilder(memorySegment); final long ts = 42L + selector.getInputSubtaskIndex() + selector.getOutputSubtaskIndex(); Buffer buffer = write(bufferBuilder, new Watermark(ts)); + bufferBuilder.recycle(); Review comment: semi nit: In quite a bit places, you could have used ``` try (BufferBuilder bufferBuilder = ...) { } ``` For this, it would be easier to first refactor `BufferBuilder` to implement `AutoCloseable` as a first commit, not the last one. I'm not entirely sure if this is worth the effort. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java ########## @@ -55,6 +63,7 @@ public static BufferBuilder fillBufferBuilder(BufferBuilder bufferBuilder, int d public static Buffer buildSingleBuffer(BufferBuilder bufferBuilder) { try (BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer()) { + bufferBuilder.recycle(); Review comment: This is a bit strange - `bufferBuilder` shouldn't be closed/recycled inside this function on an argument obtained from a caller. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java ########## @@ -363,7 +363,7 @@ private void finishBroadcastBufferBuilder() { if (broadcastBufferBuilder != null) { numBytesOut.inc(broadcastBufferBuilder.finish() * numSubpartitions); numBuffersOut.inc(numSubpartitions); - broadcastBufferBuilder.recycle(); + broadcastBufferBuilder.close(); Review comment: You are modifying a code that you have introduced in the same PR (increasing PR size). It would be better to have reversed order of the commits. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandlerTest.java ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.RescaleMappings; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +/** Test of different implementation of {@link RecoveredChannelStateHandler}. */ +@RunWith(Parameterized.class) +public class RecoveredChannelStateHandlerTest<Info, Context> { + RecoveredChannelStateHandlerTester<Info, Context> stateHandlerTester; + + @Parameterized.Parameters + public static Object[][] data() { + return new Object[][] { + {new ResultSubpartitionRecoveredChannelStateHandlerTester()}, + {new InputChannelRecoveredStateHandlerTester()}, + }; + } + + public RecoveredChannelStateHandlerTest( + RecoveredChannelStateHandlerTester<Info, Context> stateHandlerTester) { + this.stateHandlerTester = stateHandlerTester; + } + + @Test + public void testRecycleBufferBeforeRecoverWasCalled() throws Exception { + // given: Initialized state handler. + stateHandlerTester.init(); + + Info channelInfo = stateHandlerTester.getInfo(); + + // when: Request the buffer. + RecoveredChannelStateHandler.BufferWithContext<Context> buffer = + stateHandlerTester.getBuffer(channelInfo); + + // and: Recycle buffer outside. + buffer.buffer.recycle(); Review comment: `buffer.buffer.`? `%s/buffer/bufferWithContext/g`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java ########## @@ -348,6 +348,7 @@ private void finishUnicastBufferBuilder(int targetSubpartition) { if (bufferBuilder != null) { numBytesOut.inc(bufferBuilder.finish()); numBuffersOut.inc(); + bufferBuilder.recycle(); Review comment: do we need both `finish()` and `recycle()`? Wouldn't single `close()` be enough? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandlerTest.java ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.RescaleMappings; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +/** Test of different implementation of {@link RecoveredChannelStateHandler}. */ +@RunWith(Parameterized.class) +public class RecoveredChannelStateHandlerTest<Info, Context> { + RecoveredChannelStateHandlerTester<Info, Context> stateHandlerTester; + + @Parameterized.Parameters + public static Object[][] data() { + return new Object[][] { + {new ResultSubpartitionRecoveredChannelStateHandlerTester()}, + {new InputChannelRecoveredStateHandlerTester()}, + }; + } + + public RecoveredChannelStateHandlerTest( + RecoveredChannelStateHandlerTester<Info, Context> stateHandlerTester) { + this.stateHandlerTester = stateHandlerTester; + } + + @Test + public void testRecycleBufferBeforeRecoverWasCalled() throws Exception { + // given: Initialized state handler. + stateHandlerTester.init(); + + Info channelInfo = stateHandlerTester.getInfo(); + + // when: Request the buffer. + RecoveredChannelStateHandler.BufferWithContext<Context> buffer = + stateHandlerTester.getBuffer(channelInfo); + + // and: Recycle buffer outside. + buffer.buffer.recycle(); + + // then: Buffer should be recycled the same times as it was retained. + stateHandlerTester.assertState(); + } + + @Test + public void testRecycleBufferAfterRecoverWasCalled() throws Exception { + // given: Initialized state handler. + stateHandlerTester.init(); Review comment: `@Before`? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java ########## @@ -36,9 +36,10 @@ import static org.apache.flink.util.Preconditions.checkArgument; +/** */ public class TestPooledBufferProvider implements BufferProvider { - private final BlockingQueue<Buffer> buffers = new LinkedBlockingDeque<>(); + private final BlockingQueue<MemorySegment> buffers = new LinkedBlockingDeque<>(); Review comment: segments? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandlerTest.java ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.RescaleMappings; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +/** Test of different implementation of {@link RecoveredChannelStateHandler}. */ +@RunWith(Parameterized.class) +public class RecoveredChannelStateHandlerTest<Info, Context> { + RecoveredChannelStateHandlerTester<Info, Context> stateHandlerTester; + + @Parameterized.Parameters + public static Object[][] data() { + return new Object[][] { + {new ResultSubpartitionRecoveredChannelStateHandlerTester()}, + {new InputChannelRecoveredStateHandlerTester()}, + }; + } + + public RecoveredChannelStateHandlerTest( + RecoveredChannelStateHandlerTester<Info, Context> stateHandlerTester) { + this.stateHandlerTester = stateHandlerTester; + } + + @Test + public void testRecycleBufferBeforeRecoverWasCalled() throws Exception { + // given: Initialized state handler. + stateHandlerTester.init(); + + Info channelInfo = stateHandlerTester.getInfo(); + + // when: Request the buffer. + RecoveredChannelStateHandler.BufferWithContext<Context> buffer = + stateHandlerTester.getBuffer(channelInfo); + + // and: Recycle buffer outside. + buffer.buffer.recycle(); + + // then: Buffer should be recycled the same times as it was retained. + stateHandlerTester.assertState(); + } + + @Test + public void testRecycleBufferAfterRecoverWasCalled() throws Exception { + // given: Initialized state handler. + stateHandlerTester.init(); + + Info channelInfo = stateHandlerTester.getInfo(); + + // when: Request the buffer. + RecoveredChannelStateHandler.BufferWithContext<Context> buffer = + stateHandlerTester.getBuffer(channelInfo); + + // and: Pass the buffer to recovery. + stateHandlerTester.recover(channelInfo, 0, buffer.context); + + // and: Recycle buffer outside. + buffer.buffer.recycle(); + + // then: Buffer should be recycled the same times as it was retained. + stateHandlerTester.assertState(); + } + + static class ResultSubpartitionRecoveredChannelStateHandlerTester + extends RecoveredChannelStateHandlerTester<ResultSubpartitionInfo, BufferBuilder> { + AtomicInteger recycledMemorySegments; Review comment: Why `AtomicInteger`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java ########## @@ -172,8 +172,7 @@ public void setup() throws IOException { synchronized (lock) { try { for (int i = 0; i < numWriteBuffers; ++i) { - MemorySegment segment = - bufferPool.requestBufferBuilderBlocking().getMemorySegment(); + MemorySegment segment = bufferPool.requestMemorySegmentBlocking(); writeBuffers.add(segment); Review comment: writeSegments? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java ########## @@ -85,19 +84,15 @@ @Override public void recover(InputChannelInfo channelInfo, int oldSubtaskIndex, Buffer buffer) throws IOException { - try { - if (buffer.readableBytes() > 0) { - for (final RecoveredInputChannel channel : getMappedChannels(channelInfo)) { - channel.onRecoveredStateBuffer( - EventSerializer.toBuffer( - new SubtaskConnectionDescriptor( - oldSubtaskIndex, channelInfo.getInputChannelIdx()), - false)); - channel.onRecoveredStateBuffer(buffer.retainBuffer()); - } + if (buffer.readableBytes() > 0) { + for (final RecoveredInputChannel channel : getMappedChannels(channelInfo)) { + channel.onRecoveredStateBuffer( + EventSerializer.toBuffer( + new SubtaskConnectionDescriptor( + oldSubtaskIndex, channelInfo.getInputChannelIdx()), + false)); + channel.onRecoveredStateBuffer(buffer.retainBuffer()); } - } finally { - buffer.recycleBuffer(); } } Review comment: 1. I don't like that we are mixing different styles of contracts here. `RecoveredChannelStateHandler#recover` receives a `Buffer` that it doesn't own - caller is responsible for the clean up. While a couple lines below its copying buffer and passing the ownership of the copy to the `channel` - callee is responsible for the clean up. Why not stick with something that's consistent, and just have a contract that we are passing the ownership of the buffer to the callee? 2. It's not documented anywhere. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java ########## @@ -132,26 +146,24 @@ public int getNumberOfCreatedBuffers() { private final Object listenerRegistrationLock = new Object(); - private final Queue<Buffer> buffers; + private final Queue<MemorySegment> buffers; Review comment: segments? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandlerTest.java ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.RescaleMappings; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +/** Test of different implementation of {@link RecoveredChannelStateHandler}. */ +@RunWith(Parameterized.class) +public class RecoveredChannelStateHandlerTest<Info, Context> { Review comment: I'm having quite hard time to follow this test. I have a feeling that parametrising this test did more harm than good? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializer.java ########## @@ -103,7 +103,7 @@ public boolean isWritable() { @Override public void recycle() { Review comment: `close()`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBuffer.java ########## @@ -270,14 +269,13 @@ private MemorySegment requestBufferFromPool() throws IOException { try { // blocking request buffers if there is still guaranteed memory if (buffers.size() < numGuaranteedBuffers) { Review comment: segments? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandlerTest.java ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.RescaleMappings; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +/** Test of different implementation of {@link RecoveredChannelStateHandler}. */ +@RunWith(Parameterized.class) +public class RecoveredChannelStateHandlerTest<Info, Context> { + RecoveredChannelStateHandlerTester<Info, Context> stateHandlerTester; + + @Parameterized.Parameters + public static Object[][] data() { + return new Object[][] { + {new ResultSubpartitionRecoveredChannelStateHandlerTester()}, + {new InputChannelRecoveredStateHandlerTester()}, + }; + } + + public RecoveredChannelStateHandlerTest( + RecoveredChannelStateHandlerTester<Info, Context> stateHandlerTester) { + this.stateHandlerTester = stateHandlerTester; + } + + @Test + public void testRecycleBufferBeforeRecoverWasCalled() throws Exception { + // given: Initialized state handler. + stateHandlerTester.init(); + + Info channelInfo = stateHandlerTester.getInfo(); + + // when: Request the buffer. + RecoveredChannelStateHandler.BufferWithContext<Context> buffer = + stateHandlerTester.getBuffer(channelInfo); + + // and: Recycle buffer outside. + buffer.buffer.recycle(); + + // then: Buffer should be recycled the same times as it was retained. + stateHandlerTester.assertState(); + } + + @Test + public void testRecycleBufferAfterRecoverWasCalled() throws Exception { + // given: Initialized state handler. + stateHandlerTester.init(); + + Info channelInfo = stateHandlerTester.getInfo(); + + // when: Request the buffer. + RecoveredChannelStateHandler.BufferWithContext<Context> buffer = + stateHandlerTester.getBuffer(channelInfo); + + // and: Pass the buffer to recovery. + stateHandlerTester.recover(channelInfo, 0, buffer.context); + + // and: Recycle buffer outside. + buffer.buffer.recycle(); + + // then: Buffer should be recycled the same times as it was retained. + stateHandlerTester.assertState(); + } + + static class ResultSubpartitionRecoveredChannelStateHandlerTester + extends RecoveredChannelStateHandlerTester<ResultSubpartitionInfo, BufferBuilder> { + AtomicInteger recycledMemorySegments; + ResultPartition partition; + + @Override + ResultSubpartitionInfo getInfo() { + return new ResultSubpartitionInfo(0, 0); + } + + @Override + ResultSubpartitionRecoveredStateHandler createInstanceForTest() throws IOException { + // given: Result partition with recycling counter. + recycledMemorySegments = new AtomicInteger(); + partition = + new ResultPartitionBuilder() + .setNetworkBufferPool( + new NetworkBufferPool(3, 32 * 1024) { + @Override + public void recycle(MemorySegment segment) { + recycledMemorySegments.incrementAndGet(); + super.recycle(segment); + } + }) Review comment: Why not use plain simple `NetworkBufferPool` and just assert at the end the number of available segments instead of counting how many times `recycle` was called? Frankly, the test shouldn't care how many times `recycle` was called. Just that the buffers were returned to the pool. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org