[ 
https://issues.apache.org/jira/browse/FLINK-10941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16706967#comment-16706967
 ] 

ASF GitHub Bot commented on FLINK-10941:
----------------------------------------

QiLuo-BD commented on a change in pull request #7186: [FLINK-10941] Keep slots 
which contain unconsumed result partitions
URL: https://github.com/apache/flink/pull/7186#discussion_r238217342
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##########
 @@ -134,10 +138,17 @@ public void cancel(InputChannelID receiverId) {
                ctx.pipeline().fireUserEventTriggered(receiverId);
        }
 
-       public void close() {
+       public void close() throws IOException {
                if (ctx != null) {
                        ctx.channel().close();
                }
+
+               LOG.info("Close all {} readers pending for close.", 
readersToClose.size());
 
 Review comment:
   If the partition is released once it's transported, then the TM containing 
the partition will be released. It will cause a RemoteTransportException 
"Connection unexpectedly closed by remote task manager  ..." in 
CreditBasedPartitionRequestClientHandler's channelInactive() method, as the 
Netty connection is shared between TMs.
   
   So I changed this to let writer to close the readers when closing the Netty 
connection. This is the smallest possible change I can find. Do you have any 
suggestions how we can handle this case better?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Slots prematurely released which still contain unconsumed data 
> ---------------------------------------------------------------
>
>                 Key: FLINK-10941
>                 URL: https://issues.apache.org/jira/browse/FLINK-10941
>             Project: Flink
>          Issue Type: Bug
>          Components: ResourceManager
>    Affects Versions: 1.5.5, 1.6.2, 1.7.0
>            Reporter: Qi
>            Assignee: Qi
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>
> Our case is: Flink 1.5 batch mode, 32 parallelism to read data source and 4 
> parallelism to write data sink.
>  
> The read task worked perfectly with 32 TMs. However when the job was 
> executing the write task, since only 4 TMs were needed, other 28 TMs were 
> released. This caused RemoteTransportException in the write task:
>  
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connection unexpectedly closed by remote task manager 
> ’the_previous_TM_used_by_read_task'. This might indicate that the remote task 
> manager was lost.
>       at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:133)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
>       ...
>  
> After skimming YarnFlinkResourceManager related code, it seems to me that 
> Flink is releasing TMs when they’re idle, regardless of whether working TMs 
> need them.
>  
> Put in another way, Flink seems to prematurely release slots which contain 
> unconsumed data and, thus, eventually release a TM which then fails a 
> consuming task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to