junzhong qin created FLINK-34528: ------------------------------------ Summary: With shuffling data, when a Task Manager is killed, restarting the Flink job takes a considerable amount of time Key: FLINK-34528 URL: https://issues.apache.org/jira/browse/FLINK-34528 Project: Flink Issue Type: Sub-task Reporter: junzhong qin Attachments: image-2024-02-27-16-35-04-464.png
In the test case, the pipeline looks like: !image-2024-02-27-16-35-04-464.png! The Source: Custom Source generates strings, and the job keyBy the strings to Sink: Unnamed. # parallelism = 100 # taskmanager.numberOfTaskSlots = 2 # disable checkpoint The worker was killed at {code:java} 2024-02-27 16:41:49,982 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed (6/100) (2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_5_0) switched from RUNNING to FAILED on container_e2472_1705993319725_62292_01_000046 @ xxx (dataPort=38827).org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'xxx/10.169.18.138:35983 [ container_e2472_1705993319725_62292_01_000010(xxx:5454) ] '. This might indicate that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:134) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) at org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]{code} The job took about 16 seconds to restart. {code:java} // The task was scheduled to a task manager that had already been killed 2024-02-27 16:41:53,506 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: Custom Source (16/100) (attempt #3) with attempt id 2f1c7b22098a273f5471e3e8f794e1d3_bc764cd8ddf7a0cff126f51c16239658_15_3 and vertex id bc764cd8ddf7a0cff126f51c16239658_15 to container_e2472_1705993319725_62292_01_000010 @ xxx (dataPort=35983) with allocation id 975dded4548ad15b36d0e5e6aac8f5b6 // The last task switched from INITIALIZING to RUNNING 2024-02-27 16:42:05,176 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed (64/100) (2f1c7b22098a273f5471e3e8f794e1d3_0a448493b4782967b150582570326227_63_14) switched from INITIALIZING to RUNNING. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)