If I’m not mistaken, then it’s not necessarily true that the heuristic
returns InetAddress.getLocalHost() in all cases. The heuristic will select
the first network interface with the afore-mentioned conditions but before
returning it, it will try a last time to connect to the JM via the
interface bound to InetAddress.getLocalHost(). However, if this fails, then
the heuristically selected network interface will be returned.
​

On Thu, Mar 3, 2016 at 10:49 AM, Stephan Ewen <se...@apache.org> wrote:

> If the ThasManager cannot connect to the JobManager, it will use the
> interface that is bound to the machine's host name
> ("InetAddress.getLocalHost()").
>
> So, the best way to fix this would be to make sure that all machines have
> a proper network configuration. Then Flink would either use an address that
> can connect (via trying various interfaces), or it would default back to
> the hostname/interface that is configured on the machine.
>
>
> On Thu, Mar 3, 2016 at 10:43 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Max,
>>
>> the problem is that before starting the TM, we have to find the network
>> interface which is reachable by the other machines. So what we do is to
>> connect to the current JobManager. If it should happen, as in your case,
>> that the JobManager just died and the new JM address has not been written
>> to ZooKeeper, then the TMs don’t have much choice other than using the
>> heuristic.
>>
>> I can’t really tell why eth1 is chosen over eth0. The condition is that
>> the interface address is an Inet4Address, no link local address as well
>> as not a loopback address.
>>
>> Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be the
>> easiest solution to solve your problem. I’ve checked the default value is
>> set to 10 s which might be a bit too low for restarting a new JM and
>> publishing its address via ZooKeeper.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <u...@apache.org> wrote:
>>
>>> I had an offline chat with Till about this. He pointed out that the
>>> address is chosen once at start up time (while not being able to
>>> connect to the old job manager) and then it stays fixed at eth1.
>>>
>>> You can increase the lookup timeout by setting akka.lookup.timeout to
>>> a higher value (like 100 s). This is the only workaroud I'm aware of
>>> at this point. Maybe Till can chime in here whether this has other
>>> implications as well?
>>>
>>> – Ufuk
>>>
>>> On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <u...@apache.org> wrote:
>>> > Hey Max!
>>> >
>>> > for the first WARN in
>>> > org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is
>>> > expected if the new leader has not updated ZooKeeper yet. The
>>> > important thing is that the new leading job manager is eventually
>>> > retrieved. This did happen, right?
>>> >
>>> > Regarding eth1 vs. eth0: After the new job manager becomes leader, the
>>> > task manager should re-try connecting to it with the same strategy as
>>> > in the initial connection establishment (e.g. try SLOW first and only
>>> > fall back to HEURISTIC). Can you see in the logs whether this happens?
>>> >
>>> > The best thing would be to share the complete logs. Is this possible?
>>> > If not publicly, feel free to send it to me privately (uce at apache
>>> > org).
>>> >
>>> > – Ufuk
>>> >
>>> >
>>> > On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode
>>> > <maximilian.b...@tngtech.com> wrote:
>>> >> Hi everyone,
>>> >>
>>> >> we are trying to get to work JobManager HA in the context of a
>>> per-job YARN
>>> >> session using the 1.0.0-rc3 from a few days ago and are having a
>>> problem
>>> >> concerning task managers with several network interfaces.
>>> >>
>>> >> After manually killing the job manager process, the jobmanager.log on
>>> the
>>> >> newly allocated second job manager reads:
>>> >> ---
>>> >> 2016-03-02 18:01:09,635 WARN  Remoting
>>> >> - Tried to associate with unreachable remote address
>>> >> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for
>>> 5000 ms,
>>> >> all messages to this address will be delivered to dead letters.
>>> Reason:
>>> >> Connection refused: /10.127.68.136:34811
>>> >> 2016-03-02 18:01:09,644 WARN
>>> >> org.apache.flink.runtime.webmonitor.JobManagerRetriever       -
>>> Failed to
>>> >> retrieve leader gateway and port.
>>> >> akka.actor.ActorNotFound: Actor not found for:
>>> >> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/),
>>> >> Path(/user/jobmanager)]
>>> >> at
>>> >>
>>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>>> >> at
>>> >>
>>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>>> >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>> >> at
>>> >>
>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>>> >> at
>>> >>
>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>>> >> at
>>> >>
>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>>> >> at
>>> >>
>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>>> >> at
>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>> >> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>>> >> at
>>> >>
>>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>>> >> at
>>> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
>>> >> at
>>> >>
>>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>>> >> at
>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>>> >> at
>>> >>
>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>>> >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
>>> >> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
>>> >> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
>>> >> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
>>> >> at
>>> >>
>>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
>>> >> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
>>> >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>>> >> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
>>> >> at
>>> >>
>>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>>> >> at
>>> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>>> >> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>>> >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>>> >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>>> >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>> >> at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> >> at
>>> >>
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> >> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> >> at
>>> >>
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> >> ---
>>> >> where the IP not found is from the old job manager. So far, is this
>>> the
>>> >> expected behavior?
>>> >>
>>> >> The problem then arises on a new task manager, which also tries to
>>> connect
>>> >> to the old job manager unsuccessfully. The
>>> ZooKeeperLeaderRetrievalService
>>> >> starts cycling through the available network interfaces, as can be
>>> seen in
>>> >> the relevant taskmanager.log:
>>> >> ---
>>> >> 2016-03-02 18:01:13,636 INFO
>>> >>
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>> >> Starting ZooKeeperLeaderRetrievalService.
>>> >> 2016-03-02 18:01:13,646 INFO
>>> >> org.apache.flink.runtime.util.LeaderRetrievalUtils            -
>>> Trying to
>>> >> select the network interface and address to use by connecting to the
>>> leading
>>> >> JobManager.
>>> >> 2016-03-02 18:01:13,646 INFO
>>> >> org.apache.flink.runtime.util.LeaderRetrievalUtils            -
>>> TaskManager
>>> >> will try to connect for 10000 milliseconds before falling back to
>>> heuristics
>>> >> 2016-03-02 18:01:13,712 INFO
>>> org.apache.flink.runtime.net.ConnectionUtils
>>> >> - Retrieved new target address /10.127.68.136:34811.
>>> >> 2016-03-02 18:01:14,079 INFO
>>> org.apache.flink.runtime.net.ConnectionUtils
>>> >> - Trying to connect to address /10.127.68.136:34811
>>> >> 2016-03-02 18:01:14,082 INFO
>>> org.apache.flink.runtime.net.ConnectionUtils
>>> >> - Failed to connect from address
>>> >> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
>>> >> 2016-03-02 18:01:14,082 INFO
>>> org.apache.flink.runtime.net.ConnectionUtils
>>> >> - Failed to connect from address '/10.127.68.136': Connection refused
>>> >> 2016-03-02 18:01:14,082 INFO
>>> org.apache.flink.runtime.net.ConnectionUtils
>>> >> - Failed to connect from address '/10.120.193.110': Connection
>>> refused
>>> >> 2016-03-02 18:01:14,082 INFO
>>> org.apache.flink.runtime.net.ConnectionUtils
>>> >> - Failed to connect from address '/10.127.68.136': Connection refused
>>> >> 2016-03-02 18:01:14,083 INFO
>>> org.apache.flink.runtime.net.ConnectionUtils
>>> >> - Failed to connect from address '/127.0.0.1': Connection refused
>>> >> 2016-03-02 18:01:14,083 INFO
>>> org.apache.flink.runtime.net.ConnectionUtils
>>> >> - Failed to connect from address '/10.120.193.110': Connection
>>> refused
>>> >> 2016-03-02 18:01:14,083 INFO
>>> org.apache.flink.runtime.net.ConnectionUtils
>>> >> - Failed to connect from address '/10.127.68.136': Connection refused
>>> >> 2016-03-02 18:01:14,083 INFO
>>> org.apache.flink.runtime.net.ConnectionUtils
>>> >> - Failed to connect from address '/127.0.0.1': Connection refused
>>> >> ---
>>> >> After five repetitions, the task manager stops trying to retrieve the
>>> leader
>>> >> and using the HEURISTIC strategy ends up using  eth1 (10.120.193.110)
>>> from
>>> >> now on:
>>> >> ---
>>> >> 2016-03-02 18:01:23,650 INFO
>>> >>
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>> >> Stopping ZooKeeperLeaderRetrievalService.
>>> >> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn
>>> >> - EventThread shut down
>>> >> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper
>>> >> - Session: 0x25229757cff035b closed
>>> >> 2016-03-02 18:01:23,664 INFO
>>> >> org.apache.flink.runtime.taskmanager.TaskManager              -
>>> TaskManager
>>> >> will use hostname/address 'task.manager.eth1.hostname.com'
>>> (10.120.193.110)
>>> >> for communication.
>>> >> ---
>>> >> Following the new jobmanager is discovered and the taskmanager is
>>> able to
>>> >> register at the jobmanager using eth1. The problem is that
>>> connections TO
>>> >> eth1 are not possible. So flink should always use eth0. The exception
>>> we
>>> >> later see is:
>>> >> ---
>>> >> java.io.IOException: Connecting the channel failed: Connecting to
>>> remote
>>> >> task manager + 'other.task.manager.eth1.hostname/10.120.193.111:46620'
>>> has
>>> >> failed. This might indicate that the remote task manager has been
>>> lost.
>>> >> at
>>> >>
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
>>> >> at
>>> >>
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
>>> >> at
>>> >>
>>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>>> >> at
>>> >>
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>>> >> at
>>> >>
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>> >> at
>>> >>
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> >> at java.lang.Thread.run(Thread.java:744)
>>> >> ---
>>> >> The root cause seems to be that network interface selection is still
>>> using
>>> >> the old jobmanager location and hence is not able to choose the right
>>> >> interface. In particular, it seems that iteration order over the
>>> network
>>> >> interfaces differs between the HEURISTIC and SLOW strategy, which
>>> then leads
>>> >> to the wrong interface being selected.
>>> >>
>>> >> Cheers,
>>> >>  Max
>>> >> —
>>> >> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> >> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>> >>
>>>
>>
>>
>

Reply via email to