[ 
https://issues.apache.org/jira/browse/FLINK-4021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420973#comment-15420973
 ] 

ASF GitHub Bot commented on FLINK-4021:
---------------------------------------

Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2141
  
    Very impressive that you made it through that part of the system. It's very 
poorly documented and overly complex. The change looks good and ensures that a 
staged buffer that cannot be decoded because of a closed buffer pool does not 
leave the channel with auto read set to false. This is currently not a problem 
as all tasks of the job are failed, but with partial recovery this will lead to 
problems if the channel is kept alive by other consuming tasks.
    
    We could add the following as a test for this in 
`PartitionRequestClientHandlerTest`. What do you think? This covers both 
branches that you added.
    
    ```java
    /**
     * Tests that an unsuccessful message decode call for a staged message
     * does not leave the channel with auto read set to false.
     */
    @Test
    @SuppressWarnings("unchecked")
    public void testAutoReadAfterUnsuccessfulStagedMessage() throws Exception {
        PartitionRequestClientHandler handler = new 
PartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(handler);
    
        final AtomicReference<EventListener<Buffer>> listener = new 
AtomicReference<>();
    
        BufferProvider bufferProvider = mock(BufferProvider.class);
        
when(bufferProvider.addListener(any(EventListener.class))).thenAnswer(new 
Answer<Boolean>() {
                @Override
                @SuppressWarnings("unchecked")
                public Boolean answer(InvocationOnMock invocation) throws 
Throwable {
                        listener.set((EventListener<Buffer>) 
invocation.getArguments()[0]);
                        return true;
                }
        });
    
        when(bufferProvider.requestBuffer()).thenReturn(null);
    
        InputChannelID channelId = new InputChannelID(0, 0);
        RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
        when(inputChannel.getInputChannelId()).thenReturn(channelId);
    
        handler.addInputChannel(inputChannel);
    
        BufferResponse msg = createBufferResponse(channelId, channel);
    
        // Write 1st buffer msg. No buffer is available, therefore the buffer
        // should be staged and auto read should be set to false.
        assertTrue(channel.config().isAutoRead());
        channel.writeInbound(msg);
    
        // No buffer available, auto read false
        assertFalse(channel.config().isAutoRead());
    
        // Write more buffers... all staged.
        msg = createBufferResponse(channelId, channel);
        channel.writeInbound(msg);
    
        msg = createBufferResponse(channelId, channel);
        channel.writeInbound(msg);
    
        // Notify about buffer => handle 1st msg
        Buffer availableBuffer = createBuffer(false);
        listener.get().onEvent(availableBuffer);
    
        // Start processing of staged buffers (in run pending tasks). Make
        // sure that the buffer provider acts like it's destroyed.
        
when(bufferProvider.addListener(any(EventListener.class))).thenReturn(false);
        when(bufferProvider.isDestroyed()).thenReturn(true);
    
        // The 3rd staged msg has a null buffer provider
        when(inputChannel.getBufferProvider()).thenReturn(bufferProvider, 
bufferProvider, null);
    
        // Execute all tasks that are scheduled in the event loop. Further
        // eventLoop().execute() calls are directly executed, if they are
        // called in the scope of this call.
        channel.runPendingTasks();
    
        assertTrue(channel.config().isAutoRead());
    }
    ```


> Problem of setting autoread for netty channel when more tasks sharing the 
> same Tcp connection
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-4021
>                 URL: https://issues.apache.org/jira/browse/FLINK-4021
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.0.2
>            Reporter: Zhijiang Wang
>            Assignee: Zhijiang Wang
>
> More than one task sharing the same Tcp connection for shuffling data.
> If the downstream task said as "A" has no available memory segment to read 
> netty buffer from network, it will set autoread as false for the channel.
> When the task A is failed or has available segments again, the netty handler 
> will be notified to process the staging buffers first, then reset autoread as 
> true. But in some scenarios, the autoread will not be set as true any more.
> That is when processing staging buffers, first find the corresponding input 
> channel for the buffer, if the task for that input channel is failed, the 
> decodeMsg method in PartitionRequestClientHandler will return false, that 
> means setting autoread as true will not be done anymore.
> In summary,  if one task "A" sets the autoread as false because of no 
> available segments, and resulting in some staging buffers. If another task 
> "B" is failed by accident corresponding to one staging buffer. When task A 
> trys to reset autoread as true, the process can not work because of task B 
> failed.
> I have fixed this problem in our application by adding one boolean parameter 
> in decodeBufferOrEvent method to distinguish whether this method is invoke by 
> netty IO thread channel read or staged message handler task in 
> PartitionRequestClientHandler.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to