I've created an issue [1] and opened a PR [2] to fix the issue.

[1] https://issues.apache.org/jira/browse/FLINK-3570
[2] https://github.com/apache/flink/pull/1758

Cheers,
Till


On Thu, Mar 3, 2016 at 12:33 PM, Maximilian Bode <
maximilian.b...@tngtech.com> wrote:

> Hi Ufuk, Till and Stephan,
>
> Yes, that is what we observed. The primary hostname, i.e. the one returned
> by the unix hostname command, is in fact bound to the eth0 interface,
> whereas Flink uses the eth1 interface (pertaining to another hostname).
>
> Changing akka.lookup.timeout to 100 s seems to fix the problem as now the
> new job manager is available in sufficient time. I still would agree with
> Stephan on taking the local hostname being the preferred strategy.
>
> Cheers,
>  Max
> —
> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
> Am 03.03.2016 um 12:29 schrieb Till Rohrmann <till.rohrm...@gmail.com>:
>
> No I don't think this behaviour has been introduced by HA. That is the
> default behaviour we used for a long time. If you think we should still
> change it, then I can open an issue for it.
>
> On Thu, Mar 3, 2016 at 12:20 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Okay, that is a change from the original behavior, introduced in HA.
>> Originally, if the connection attempts failed, it always returned the 
>> InetAddress.getLocalHost()
>> interface.
>> I think we should change it back to that, because that interface is by
>> far the best possible heuristic.
>>
>> On Thu, Mar 3, 2016 at 11:39 AM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> If I’m not mistaken, then it’s not necessarily true that the heuristic
>>> returns InetAddress.getLocalHost() in all cases. The heuristic will
>>> select the first network interface with the afore-mentioned conditions but
>>> before returning it, it will try a last time to connect to the JM via the
>>> interface bound to InetAddress.getLocalHost(). However, if this fails,
>>> then the heuristically selected network interface will be returned.
>>> ​
>>>
>>> On Thu, Mar 3, 2016 at 10:49 AM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> If the ThasManager cannot connect to the JobManager, it will use the
>>>> interface that is bound to the machine's host name
>>>> ("InetAddress.getLocalHost()").
>>>>
>>>> So, the best way to fix this would be to make sure that all machines
>>>> have a proper network configuration. Then Flink would either use an address
>>>> that can connect (via trying various interfaces), or it would default back
>>>> to the hostname/interface that is configured on the machine.
>>>>
>>>>
>>>> On Thu, Mar 3, 2016 at 10:43 AM, Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Max,
>>>>>
>>>>> the problem is that before starting the TM, we have to find the
>>>>> network interface which is reachable by the other machines. So what we do
>>>>> is to connect to the current JobManager. If it should happen, as in your
>>>>> case, that the JobManager just died and the new JM address has not been
>>>>> written to ZooKeeper, then the TMs don’t have much choice other than using
>>>>> the heuristic.
>>>>>
>>>>> I can’t really tell why eth1 is chosen over eth0. The condition is
>>>>> that the interface address is an Inet4Address, no link local address
>>>>> as well as not a loopback address.
>>>>>
>>>>> Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be
>>>>> the easiest solution to solve your problem. I’ve checked the default value
>>>>> is set to 10 s which might be a bit too low for restarting a new JM
>>>>> and publishing its address via ZooKeeper.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>> ​
>>>>>
>>>>> On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <u...@apache.org> wrote:
>>>>>
>>>>>> I had an offline chat with Till about this. He pointed out that the
>>>>>> address is chosen once at start up time (while not being able to
>>>>>> connect to the old job manager) and then it stays fixed at eth1.
>>>>>>
>>>>>> You can increase the lookup timeout by setting akka.lookup.timeout to
>>>>>> a higher value (like 100 s). This is the only workaroud I'm aware of
>>>>>> at this point. Maybe Till can chime in here whether this has other
>>>>>> implications as well?
>>>>>>
>>>>>> – Ufuk
>>>>>>
>>>>>> On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <u...@apache.org> wrote:
>>>>>> > Hey Max!
>>>>>> >
>>>>>> > for the first WARN in
>>>>>> > org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is
>>>>>> > expected if the new leader has not updated ZooKeeper yet. The
>>>>>> > important thing is that the new leading job manager is eventually
>>>>>> > retrieved. This did happen, right?
>>>>>> >
>>>>>> > Regarding eth1 vs. eth0: After the new job manager becomes leader,
>>>>>> the
>>>>>> > task manager should re-try connecting to it with the same strategy
>>>>>> as
>>>>>> > in the initial connection establishment (e.g. try SLOW first and
>>>>>> only
>>>>>> > fall back to HEURISTIC). Can you see in the logs whether this
>>>>>> happens?
>>>>>> >
>>>>>> > The best thing would be to share the complete logs. Is this
>>>>>> possible?
>>>>>> > If not publicly, feel free to send it to me privately (uce at apache
>>>>>> > org).
>>>>>> >
>>>>>> > – Ufuk
>>>>>> >
>>>>>> >
>>>>>> > On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode
>>>>>> > <maximilian.b...@tngtech.com> wrote:
>>>>>> >> Hi everyone,
>>>>>> >>
>>>>>> >> we are trying to get to work JobManager HA in the context of a
>>>>>> per-job YARN
>>>>>> >> session using the 1.0.0-rc3 from a few days ago and are having a
>>>>>> problem
>>>>>> >> concerning task managers with several network interfaces.
>>>>>> >>
>>>>>> >> After manually killing the job manager process, the jobmanager.log
>>>>>> on the
>>>>>> >> newly allocated second job manager reads:
>>>>>> >> ---
>>>>>> >> 2016-03-02 18:01:09,635 WARN  Remoting
>>>>>> >> - Tried to associate with unreachable remote address
>>>>>> >> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for
>>>>>> 5000 ms,
>>>>>> >> all messages to this address will be delivered to dead letters.
>>>>>> Reason:
>>>>>> >> Connection refused: /10.127.68.136:34811
>>>>>> >> 2016-03-02 18:01:09,644 WARN
>>>>>> >> org.apache.flink.runtime.webmonitor.JobManagerRetriever       -
>>>>>> Failed to
>>>>>> >> retrieve leader gateway and port.
>>>>>> >> akka.actor.ActorNotFound: Actor not found for:
>>>>>> >> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/),
>>>>>> >> Path(/user/jobmanager)]
>>>>>> >> at
>>>>>> >>
>>>>>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>>>>>> >> at
>>>>>> >>
>>>>>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>>>>>> >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>>>> >> at
>>>>>> >>
>>>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>>>>>> >> at
>>>>>> >>
>>>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>>>>>> >> at
>>>>>> >>
>>>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>>>>>> >> at
>>>>>> >>
>>>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>>>>>> >> at
>>>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>>>> >> at
>>>>>> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>>>>>> >> at
>>>>>> >>
>>>>>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>>>>>> >> at
>>>>>> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
>>>>>> >> at
>>>>>> >>
>>>>>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>>>>>> >> at
>>>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>>>>>> >> at
>>>>>> >>
>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>>>>>> >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
>>>>>> >> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
>>>>>> >> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
>>>>>> >> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
>>>>>> >> at
>>>>>> >>
>>>>>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
>>>>>> >> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
>>>>>> >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>>>>>> >> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
>>>>>> >> at
>>>>>> >>
>>>>>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>>>>>> >> at
>>>>>> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>>>>>> >> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>>>>>> >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>>>>>> >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>>>>>> >> at
>>>>>> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>>>>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>>>> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>>>> >> at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>> >> at
>>>>>> >>
>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>> >> at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>> >> at
>>>>>> >>
>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>> >> ---
>>>>>> >> where the IP not found is from the old job manager. So far, is
>>>>>> this the
>>>>>> >> expected behavior?
>>>>>> >>
>>>>>> >> The problem then arises on a new task manager, which also tries to
>>>>>> connect
>>>>>> >> to the old job manager unsuccessfully. The
>>>>>> ZooKeeperLeaderRetrievalService
>>>>>> >> starts cycling through the available network interfaces, as can be
>>>>>> seen in
>>>>>> >> the relevant taskmanager.log:
>>>>>> >> ---
>>>>>> >> 2016-03-02 18:01:13,636 INFO
>>>>>> >>
>>>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService 
>>>>>>  -
>>>>>> >> Starting ZooKeeperLeaderRetrievalService.
>>>>>> >> 2016-03-02 18:01:13,646 INFO
>>>>>> >> org.apache.flink.runtime.util.LeaderRetrievalUtils            -
>>>>>> Trying to
>>>>>> >> select the network interface and address to use by connecting to
>>>>>> the leading
>>>>>> >> JobManager.
>>>>>> >> 2016-03-02 18:01:13,646 INFO
>>>>>> >> org.apache.flink.runtime.util.LeaderRetrievalUtils            -
>>>>>> TaskManager
>>>>>> >> will try to connect for 10000 milliseconds before falling back to
>>>>>> heuristics
>>>>>> >> 2016-03-02 18:01:13,712 INFO
>>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>>> >> - Retrieved new target address /10.127.68.136:34811.
>>>>>> >> 2016-03-02 18:01:14,079 INFO
>>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>>> >> - Trying to connect to address /10.127.68.136:34811
>>>>>> >> 2016-03-02 18:01:14,082 INFO
>>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>>> >> - Failed to connect from address
>>>>>> >> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
>>>>>> >> 2016-03-02 18:01:14,082 INFO
>>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>>> >> - Failed to connect from address '/10.127.68.136': Connection
>>>>>> refused
>>>>>> >> 2016-03-02 18:01:14,082 INFO
>>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>>> >> - Failed to connect from address '/10.120.193.110': Connection
>>>>>> refused
>>>>>> >> 2016-03-02 18:01:14,082 INFO
>>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>>> >> - Failed to connect from address '/10.127.68.136': Connection
>>>>>> refused
>>>>>> >> 2016-03-02 18:01:14,083 INFO
>>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>>> >> - Failed to connect from address '/127.0.0.1': Connection refused
>>>>>> >> 2016-03-02 18:01:14,083 INFO
>>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>>> >> - Failed to connect from address '/10.120.193.110': Connection
>>>>>> refused
>>>>>> >> 2016-03-02 18:01:14,083 INFO
>>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>>> >> - Failed to connect from address '/10.127.68.136': Connection
>>>>>> refused
>>>>>> >> 2016-03-02 18:01:14,083 INFO
>>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>>> >> - Failed to connect from address '/127.0.0.1': Connection refused
>>>>>> >> ---
>>>>>> >> After five repetitions, the task manager stops trying to retrieve
>>>>>> the leader
>>>>>> >> and using the HEURISTIC strategy ends up using  eth1
>>>>>> (10.120.193.110) from
>>>>>> >> now on:
>>>>>> >> ---
>>>>>> >> 2016-03-02 18:01:23,650 INFO
>>>>>> >>
>>>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService 
>>>>>>  -
>>>>>> >> Stopping ZooKeeperLeaderRetrievalService.
>>>>>> >> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn
>>>>>> >> - EventThread shut down
>>>>>> >> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper
>>>>>> >> - Session: 0x25229757cff035b closed
>>>>>> >> 2016-03-02 18:01:23,664 INFO
>>>>>> >> org.apache.flink.runtime.taskmanager.TaskManager              -
>>>>>> TaskManager
>>>>>> >> will use hostname/address 'task.manager.eth1.hostname.com'
>>>>>> (10.120.193.110)
>>>>>> >> for communication.
>>>>>> >> ---
>>>>>> >> Following the new jobmanager is discovered and the taskmanager is
>>>>>> able to
>>>>>> >> register at the jobmanager using eth1. The problem is that
>>>>>> connections TO
>>>>>> >> eth1 are not possible. So flink should always use eth0. The
>>>>>> exception we
>>>>>> >> later see is:
>>>>>> >> ---
>>>>>> >> java.io.IOException: Connecting the channel failed: Connecting to
>>>>>> remote
>>>>>> >> task manager + 'other.task.manager.eth1.hostname/
>>>>>> 10.120.193.111:46620' has
>>>>>> >> failed. This might indicate that the remote task manager has been
>>>>>> lost.
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>>>>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>> >> at java.lang.Thread.run(Thread.java:744)
>>>>>> >> ---
>>>>>> >> The root cause seems to be that network interface selection is
>>>>>> still using
>>>>>> >> the old jobmanager location and hence is not able to choose the
>>>>>> right
>>>>>> >> interface. In particular, it seems that iteration order over the
>>>>>> network
>>>>>> >> interfaces differs between the HEURISTIC and SLOW strategy, which
>>>>>> then leads
>>>>>> >> to the wrong interface being selected.
>>>>>> >>
>>>>>> >> Cheers,
>>>>>> >>  Max
>>>>>> >> —
>>>>>> >> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>>>>> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>> >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>> >> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>> >>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>

Reply via email to