[ https://issues.apache.org/jira/browse/FLINK-10941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692996#comment-16692996 ]
zhijiang commented on FLINK-10941: ---------------------------------- Hi [~QiLuo], thank you for remembering the external shuffle issue. :) I agree with above comments from [~till.rohrmann]. Currently the lifecycle of task and its output consumption is not strictly consistent. We can establish the relationship accordingly. For internal shuffle, the {{TaskManager}} should not be released if there are still output data under shuffle. To do so, one way is to wait all the memory output transported completely or persist output before task enters {{FINISHED}} state. For external shuffle, the task can enters {{FINISHED}} state as current way and {{TaskManager}} can also exit as long as the persistent outputs already registers to the external shuffle service. > Slots prematurely released which still contain unconsumed data > --------------------------------------------------------------- > > Key: FLINK-10941 > URL: https://issues.apache.org/jira/browse/FLINK-10941 > Project: Flink > Issue Type: Bug > Components: ResourceManager > Affects Versions: 1.5.5, 1.6.2, 1.7.0 > Reporter: Qi > Priority: Critical > Fix For: 1.8.0 > > > Our case is: Flink 1.5 batch mode, 32 parallelism to read data source and 4 > parallelism to write data sink. > > The read task worked perfectly with 32 TMs. However when the job was > executing the write task, since only 4 TMs were needed, other 28 TMs were > released. This caused RemoteTransportException in the write task: > > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Connection unexpectedly closed by remote task manager > ’the_previous_TM_used_by_read_task'. This might indicate that the remote task > manager was lost. > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:133) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) > ... > > After skimming YarnFlinkResourceManager related code, it seems to me that > Flink is releasing TMs when they’re idle, regardless of whether working TMs > need them. > > Put in another way, Flink seems to prematurely release slots which contain > unconsumed data and, thus, eventually release a TM which then fails a > consuming task. -- This message was sent by Atlassian JIRA (v7.6.3#76005)