[ 
https://issues.apache.org/jira/browse/FLINK-28115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556929#comment-17556929
 ] 

Huameng Li commented on FLINK-28115:
------------------------------------

Hi Jing and Qingsheng, thanks for looking into this parallelism re-balance 
issue issue.

The exception stack trace logging is from the task manager (TM).

This issue exists in only Flink 1.15.0, and occurs right after the job 
submission, and where the # of parallelism of map/process function is different 
from that of sink or source.

The issue is not related to Flink Kafka connector. The TaskManager is still 
alive when the issue occurs.

 If we use same number of parallelism (No rebalance)  for *source* (parallelism 
{*}4{*}) -> *map* (parallelism {*}4{*}) -> *sink* (parallelism {*}4{*}), the 
job runs fine without issue. 

> Flink 1.15.0 Parallelism Rebalance causes flink job failure
> -----------------------------------------------------------
>
>                 Key: FLINK-28115
>                 URL: https://issues.apache.org/jira/browse/FLINK-28115
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.15.0
>         Environment: Flink 1.15.0 session cluster with 8 hosts.
>            Reporter: Huameng Li
>            Priority: Major
>         Attachments: image-2022-06-17-13-01-08-992.png
>
>
> {color:#de350b}*Issue:*{color}
> *Flink 1.15.0 Parallelism Rebalance causes flink job failure.* Same issue was 
> not in flink 1.14.4.
> {color:#de350b}*Exceptions:*{color}
> *1 of the 8 re-balance parallelism task slots failed due to* 
> *{color:#de350b}org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>  finishConnect(..) failed: Connection refused: /127.0.0.1:43354{color}*
> *{color:#de350b}Caused by: java.net.ConnectException: finishConnect(..) 
> failed: Connection refused{color}*
>  
> *Job topology:*
> *kafkaSource (parallelism 4) -> map/processer (parallelism 8) -> kafkaSink (4 
> parallelism)*
>  
> *!image-2022-06-17-13-01-08-992.png!*
>  
>  
> *Error stack trace:*
> 2022-06-17 12:54:38.563 WARN  [Framework] [Map (3/8)#5|#5] 
> org.apache.flink.runtime.taskmanager.Task  - Map (3/8)#5 
> (69a82f741d68fd7161d7b13de48c6c4b) switched from RUNNING to FAILED with 
> failure cause: 
> org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
>  Connection for partition 
> 2064424258b3b74fdc349607017f1029#1@a94744160d7d5e85101881e2d783dcd2 not 
> reachable.
>     at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:190)
>     at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:342)
>     at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:312)
>     at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:115)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connecting to remote task manager '/127.0.0.1:43354' has failed. This might 
> indicate that the remote task manager has been lost.
>     at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:169)
>     at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:135)
>     at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:96)
>     at 
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:95)
>     at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:186)
>     ... 15 more
> Caused by: 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>  finishConnect(..) failed: Connection refused: /127.0.0.1:43354
> Caused by: java.net.ConnectException: finishConnect(..) failed: Connection 
> refused
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors.newConnectException0(Errors.java:155)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors.handleConnectErrno(Errors.java:128)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Socket.finishConnect(Socket.java:320)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:710)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:687)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:567)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:470)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>     at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to