Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r143425927 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -434,6 +435,52 @@ public void testWaitForFloatingBuffersOnBuffer() throws Exception { verify(bufferPool, times(1)).requestBuffer(); } + /** + * Tests to verify that there is no race condition with two things running in parallel: + * requesting floating buffers and some other thread recycling them. + */ + @Test + public void testConcurrentRequestBufferAndNotifyBufferAvailable() throws Exception { + // Setup + final ExecutorService executor = Executors.newFixedThreadPool(1); + final Buffer buffer = TestBufferFactory.createBuffer(); + final BufferPool bufferPool = mock(BufferPool.class); + when(bufferPool.requestBuffer()).thenReturn(null); + when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true); + + final SingleInputGate inputGate = mock(SingleInputGate.class); + when(inputGate.getBufferPool()).thenReturn(bufferPool); + when(inputGate.getBufferProvider()).thenReturn(bufferPool); + try { + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + // Trigger to request one floating buffer on sender backlog + inputChannel.onBuffer(buffer, 0, 1); + + final CountDownLatch sync = new CountDownLatch(1); + + // Submit task and wait to finish + Future<Void> result = executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + inputChannel.notifyBufferAvailable(buffer); + sync.countDown(); + + return null; + } + }); --- End diff -- This will not run the code in parallel - `onBuffer()` is always executed before `inputChannel.notifyBufferAvailable(buffer);`. Among the tests I recently stumbled upon, `BlobServerGetTest#testConcurrentGetOperations()` may be a good base to start a new concurrency test with.
---