Github user uce commented on the issue: https://github.com/apache/flink/pull/2141 Very impressive that you made it through that part of the system. It's very poorly documented and overly complex. The change looks good and ensures that a staged buffer that cannot be decoded because of a closed buffer pool does not leave the channel with auto read set to false. This is currently not a problem as all tasks of the job are failed, but with partial recovery this will lead to problems if the channel is kept alive by other consuming tasks. We could add the following as a test for this in `PartitionRequestClientHandlerTest`. What do you think? This covers both branches that you added. ```java /** * Tests that an unsuccessful message decode call for a staged message * does not leave the channel with auto read set to false. */ @Test @SuppressWarnings("unchecked") public void testAutoReadAfterUnsuccessfulStagedMessage() throws Exception { PartitionRequestClientHandler handler = new PartitionRequestClientHandler(); EmbeddedChannel channel = new EmbeddedChannel(handler); final AtomicReference<EventListener<Buffer>> listener = new AtomicReference<>(); BufferProvider bufferProvider = mock(BufferProvider.class); when(bufferProvider.addListener(any(EventListener.class))).thenAnswer(new Answer<Boolean>() { @Override @SuppressWarnings("unchecked") public Boolean answer(InvocationOnMock invocation) throws Throwable { listener.set((EventListener<Buffer>) invocation.getArguments()[0]); return true; } }); when(bufferProvider.requestBuffer()).thenReturn(null); InputChannelID channelId = new InputChannelID(0, 0); RemoteInputChannel inputChannel = mock(RemoteInputChannel.class); when(inputChannel.getInputChannelId()).thenReturn(channelId); handler.addInputChannel(inputChannel); BufferResponse msg = createBufferResponse(channelId, channel); // Write 1st buffer msg. No buffer is available, therefore the buffer // should be staged and auto read should be set to false. assertTrue(channel.config().isAutoRead()); channel.writeInbound(msg); // No buffer available, auto read false assertFalse(channel.config().isAutoRead()); // Write more buffers... all staged. msg = createBufferResponse(channelId, channel); channel.writeInbound(msg); msg = createBufferResponse(channelId, channel); channel.writeInbound(msg); // Notify about buffer => handle 1st msg Buffer availableBuffer = createBuffer(false); listener.get().onEvent(availableBuffer); // Start processing of staged buffers (in run pending tasks). Make // sure that the buffer provider acts like it's destroyed. when(bufferProvider.addListener(any(EventListener.class))).thenReturn(false); when(bufferProvider.isDestroyed()).thenReturn(true); // The 3rd staged msg has a null buffer provider when(inputChannel.getBufferProvider()).thenReturn(bufferProvider, bufferProvider, null); // Execute all tasks that are scheduled in the event loop. Further // eventLoop().execute() calls are directly executed, if they are // called in the scope of this call. channel.runPendingTasks(); assertTrue(channel.config().isAutoRead()); } ```
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---