Hi Till,

This job is not on AthenaX but on a special uber version Flink. I tried to
ping the connected host from connecting host. It seems very stable. For the
connection timeout, I do set it as 20min but it still report the timeout
after 2 minutes. Could you let me know how do you test locally about the
timeout setting?

Thanks,
Wenrui

On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Wenrui,
>
> the exception now occurs while finishing the connection creation. I'm not
> sure whether this is so different. Could it be that your network is
> overloaded or not very reliable? Have you tried running your Flink job
> outside of AthenaX?
>
> Cheers,
> Till
>
> On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng <wenruim...@gmail.com> wrote:
>
>> Hi Till,
>>
>> Thanks for your reply. Our cluster is Yarn cluster. I found that if we
>> decrease the total parallel the timeout issue can be avoided. But we do
>> need that amount of taskManagers to process data. In addition, once I
>> increase the netty server threads to 128, the error is changed to to
>> following error. It seems the cause is different. Could you help take a
>> look?
>>
>> 2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
>> java.io.IOException: Connecting the channel failed: Connecting to remote
>> task manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This
>> might indicate that the remote task manager has been lost.
>>         at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>>         at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
>>         at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
>>         at
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
>>         at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
>>         at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
>>         at
>> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
>>         at
>> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
>>         at
>> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
>>         at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466'
>> has failed. This might indicate that the remote task manager has been lost.
>>         at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>>         at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>         ... 1 common frames omitted
>> Caused by: java.net.ConnectException: Connection timed out:
>> athena464-sjc1/10.70.129.13:39466
>>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>         at
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
>>         ... 6 common frames omitted
>>
>>
>> Thanks,
>> Wenrui
>>
>> On Mon, Jan 7, 2019 at 2:38 AM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Wenrui,
>>>
>>> the code to set the connect timeout looks ok to me [1]. I also tested it
>>> locally and checked that the timeout is correctly registered in Netty's
>>> AbstractNioChannel [2].
>>>
>>> Increasing the number of threads to 128 should not be necessary. But it
>>> could indicate that there is some long lasting or blocking operation being
>>> executed by the threads.
>>>
>>> How does the job submission and cluster configuration work with AthenaX?
>>> Will the platform spawn for each job a new Flink cluster for which you can
>>> specify the cluster configuration?
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java#L102
>>> [2]
>>> https://github.com/netty/netty/blob/netty-4.0.27.Final/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java#L207
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sat, Jan 5, 2019 at 2:22 AM Wenrui Meng <wenruim...@gmail.com> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> Thanks for your reply and help on this issue.
>>>>
>>>> I increased taskmanager.network.netty.client.connectTimeoutSec to 1200
>>>> which is 20 minutes. But it seems the connection not respects this timeout.
>>>> In addition, I increase both taskmanager.network.request-backoff.max
>>>> and taskmanager.registration.max-backoff to 20min.
>>>>
>>>> One thing I found is helpful to some extent is increasing
>>>> the taskmanager.network.netty.server.numThreads. I increase it to 128
>>>> threads, it can succeed sometimes. But keep increasing it doesn't solve the
>>>> problem. We have 100 parallel intermediate results, so there are too many
>>>> partition requests. I think that's why it timeout. The solution should let
>>>> the connection timeout increase. But I think there is some issue that
>>>> connection doesn't respect the timeout config.
>>>>
>>>> We will definitely try the latest flink version. But at Uber, there is
>>>> a team who is responsible to provide a platform with Flink. They will
>>>> upgrade it at the end of this Month. Meanwhile, I would like to ask some
>>>> help to investigate how to increase the connection timeout and make it
>>>> respected.
>>>>
>>>> Thanks,
>>>> Wenrui
>>>>
>>>> On Fri, Jan 4, 2019 at 5:27 AM Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Wenrui,
>>>>>
>>>>> from the logs I cannot spot anything suspicious. Which configuration
>>>>> parameters have you changed exactly? Does the JobManager log contain
>>>>> anything suspicious?
>>>>>
>>>>> The current Flink version changed quite a bit wrt 1.4. Thus, it might
>>>>> be worth a try to run the job with the latest Flink version.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng <wenruim...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I consistently get connection timeout issue when creating
>>>>>> partitionRequestClient in flink 1.4. I tried to ping from the connecting
>>>>>> host to the connected host, but the ping latency is less than 0.1 ms
>>>>>> consistently. So it's probably not due to the cluster status. I also 
>>>>>> tried
>>>>>> increase max backoff, nettowrk timeout and some other setting, it doesn't
>>>>>> help.
>>>>>>
>>>>>> I enabled the debug log of flink but not find any suspicious or
>>>>>> useful information to help me fix the issue. Here is the link
>>>>>> <https://www.dropbox.com/sh/sul62muz5pk0bqk/AABX8QbMrNmSq3k8I289mGmSa?dl=0>
>>>>>> of the jobManager and taskManager logs. The connecting host is the host
>>>>>> which throw the exception. The connected host is the host the connecting
>>>>>> host try to request partition from.
>>>>>>
>>>>>> Since our platform is not up to date yet, the flink version I used in
>>>>>> this is 1.4. But I noticed that there is not much change of these code on
>>>>>> the Master branch. Any help will be appreciated.
>>>>>>
>>>>>> Here is stack trace of the exception
>>>>>>
>>>>>> from RUNNING to FAILED.
>>>>>> java.io.IOException: Connecting the channel failed: Connecting to
>>>>>> remote task manager + 'athena485-sjc1/10.70.132.8:34185' has failed.
>>>>>> This might indicate that the remote task manager has been lost.
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by:
>>>>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>>>>> Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185'
>>>>>> has failed. This might indicate that the remote task manager has been 
>>>>>> lost.
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>>>>> ... 1 common frames omitted
>>>>>> Caused by:
>>>>>> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException:
>>>>>> connection timed out: athena485-sjc1/10.70.132.8:34185
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
>>>>>> ... 6 common frames omitted
>>>>>>
>>>>>> Thanks,
>>>>>> Wenrui
>>>>>>
>>>>>

Reply via email to