Okay, that is a change from the original behavior, introduced in HA.
Originally, if the connection attempts failed, it always returned the
InetAddress.getLocalHost()
interface.
I think we should change it back to that, because that interface is by far
the best possible heuristic.

On Thu, Mar 3, 2016 at 11:39 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> If I’m not mistaken, then it’s not necessarily true that the heuristic
> returns InetAddress.getLocalHost() in all cases. The heuristic will
> select the first network interface with the afore-mentioned conditions but
> before returning it, it will try a last time to connect to the JM via the
> interface bound to InetAddress.getLocalHost(). However, if this fails,
> then the heuristically selected network interface will be returned.
> ​
>
> On Thu, Mar 3, 2016 at 10:49 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> If the ThasManager cannot connect to the JobManager, it will use the
>> interface that is bound to the machine's host name
>> ("InetAddress.getLocalHost()").
>>
>> So, the best way to fix this would be to make sure that all machines have
>> a proper network configuration. Then Flink would either use an address that
>> can connect (via trying various interfaces), or it would default back to
>> the hostname/interface that is configured on the machine.
>>
>>
>> On Thu, Mar 3, 2016 at 10:43 AM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Max,
>>>
>>> the problem is that before starting the TM, we have to find the network
>>> interface which is reachable by the other machines. So what we do is to
>>> connect to the current JobManager. If it should happen, as in your case,
>>> that the JobManager just died and the new JM address has not been written
>>> to ZooKeeper, then the TMs don’t have much choice other than using the
>>> heuristic.
>>>
>>> I can’t really tell why eth1 is chosen over eth0. The condition is that
>>> the interface address is an Inet4Address, no link local address as well
>>> as not a loopback address.
>>>
>>> Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be the
>>> easiest solution to solve your problem. I’ve checked the default value is
>>> set to 10 s which might be a bit too low for restarting a new JM and
>>> publishing its address via ZooKeeper.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <u...@apache.org> wrote:
>>>
>>>> I had an offline chat with Till about this. He pointed out that the
>>>> address is chosen once at start up time (while not being able to
>>>> connect to the old job manager) and then it stays fixed at eth1.
>>>>
>>>> You can increase the lookup timeout by setting akka.lookup.timeout to
>>>> a higher value (like 100 s). This is the only workaroud I'm aware of
>>>> at this point. Maybe Till can chime in here whether this has other
>>>> implications as well?
>>>>
>>>> – Ufuk
>>>>
>>>> On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <u...@apache.org> wrote:
>>>> > Hey Max!
>>>> >
>>>> > for the first WARN in
>>>> > org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is
>>>> > expected if the new leader has not updated ZooKeeper yet. The
>>>> > important thing is that the new leading job manager is eventually
>>>> > retrieved. This did happen, right?
>>>> >
>>>> > Regarding eth1 vs. eth0: After the new job manager becomes leader, the
>>>> > task manager should re-try connecting to it with the same strategy as
>>>> > in the initial connection establishment (e.g. try SLOW first and only
>>>> > fall back to HEURISTIC). Can you see in the logs whether this happens?
>>>> >
>>>> > The best thing would be to share the complete logs. Is this possible?
>>>> > If not publicly, feel free to send it to me privately (uce at apache
>>>> > org).
>>>> >
>>>> > – Ufuk
>>>> >
>>>> >
>>>> > On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode
>>>> > <maximilian.b...@tngtech.com> wrote:
>>>> >> Hi everyone,
>>>> >>
>>>> >> we are trying to get to work JobManager HA in the context of a
>>>> per-job YARN
>>>> >> session using the 1.0.0-rc3 from a few days ago and are having a
>>>> problem
>>>> >> concerning task managers with several network interfaces.
>>>> >>
>>>> >> After manually killing the job manager process, the jobmanager.log
>>>> on the
>>>> >> newly allocated second job manager reads:
>>>> >> ---
>>>> >> 2016-03-02 18:01:09,635 WARN  Remoting
>>>> >> - Tried to associate with unreachable remote address
>>>> >> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for
>>>> 5000 ms,
>>>> >> all messages to this address will be delivered to dead letters.
>>>> Reason:
>>>> >> Connection refused: /10.127.68.136:34811
>>>> >> 2016-03-02 18:01:09,644 WARN
>>>> >> org.apache.flink.runtime.webmonitor.JobManagerRetriever       -
>>>> Failed to
>>>> >> retrieve leader gateway and port.
>>>> >> akka.actor.ActorNotFound: Actor not found for:
>>>> >> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/),
>>>> >> Path(/user/jobmanager)]
>>>> >> at
>>>> >>
>>>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>>>> >> at
>>>> >>
>>>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>>>> >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>> >> at
>>>> >>
>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>>>> >> at
>>>> >>
>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>>>> >> at
>>>> >>
>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>>>> >> at
>>>> >>
>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>>>> >> at
>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>> >> at
>>>> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>>>> >> at
>>>> >>
>>>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>>>> >> at
>>>> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
>>>> >> at
>>>> >>
>>>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>>>> >> at
>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>>>> >> at
>>>> >>
>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>>>> >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
>>>> >> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
>>>> >> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
>>>> >> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
>>>> >> at
>>>> >>
>>>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
>>>> >> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
>>>> >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>>>> >> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
>>>> >> at
>>>> >>
>>>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>>>> >> at
>>>> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>>>> >> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>>>> >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>>>> >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>>>> >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>> >> at
>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> >> at
>>>> >>
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> >> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> >> at
>>>> >>
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> >> ---
>>>> >> where the IP not found is from the old job manager. So far, is this
>>>> the
>>>> >> expected behavior?
>>>> >>
>>>> >> The problem then arises on a new task manager, which also tries to
>>>> connect
>>>> >> to the old job manager unsuccessfully. The
>>>> ZooKeeperLeaderRetrievalService
>>>> >> starts cycling through the available network interfaces, as can be
>>>> seen in
>>>> >> the relevant taskmanager.log:
>>>> >> ---
>>>> >> 2016-03-02 18:01:13,636 INFO
>>>> >>
>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>> >> Starting ZooKeeperLeaderRetrievalService.
>>>> >> 2016-03-02 18:01:13,646 INFO
>>>> >> org.apache.flink.runtime.util.LeaderRetrievalUtils            -
>>>> Trying to
>>>> >> select the network interface and address to use by connecting to the
>>>> leading
>>>> >> JobManager.
>>>> >> 2016-03-02 18:01:13,646 INFO
>>>> >> org.apache.flink.runtime.util.LeaderRetrievalUtils            -
>>>> TaskManager
>>>> >> will try to connect for 10000 milliseconds before falling back to
>>>> heuristics
>>>> >> 2016-03-02 18:01:13,712 INFO
>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>> >> - Retrieved new target address /10.127.68.136:34811.
>>>> >> 2016-03-02 18:01:14,079 INFO
>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>> >> - Trying to connect to address /10.127.68.136:34811
>>>> >> 2016-03-02 18:01:14,082 INFO
>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>> >> - Failed to connect from address
>>>> >> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
>>>> >> 2016-03-02 18:01:14,082 INFO
>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>> >> - Failed to connect from address '/10.127.68.136': Connection
>>>> refused
>>>> >> 2016-03-02 18:01:14,082 INFO
>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>> >> - Failed to connect from address '/10.120.193.110': Connection
>>>> refused
>>>> >> 2016-03-02 18:01:14,082 INFO
>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>> >> - Failed to connect from address '/10.127.68.136': Connection
>>>> refused
>>>> >> 2016-03-02 18:01:14,083 INFO
>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>> >> - Failed to connect from address '/127.0.0.1': Connection refused
>>>> >> 2016-03-02 18:01:14,083 INFO
>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>> >> - Failed to connect from address '/10.120.193.110': Connection
>>>> refused
>>>> >> 2016-03-02 18:01:14,083 INFO
>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>> >> - Failed to connect from address '/10.127.68.136': Connection
>>>> refused
>>>> >> 2016-03-02 18:01:14,083 INFO
>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>> >> - Failed to connect from address '/127.0.0.1': Connection refused
>>>> >> ---
>>>> >> After five repetitions, the task manager stops trying to retrieve
>>>> the leader
>>>> >> and using the HEURISTIC strategy ends up using  eth1
>>>> (10.120.193.110) from
>>>> >> now on:
>>>> >> ---
>>>> >> 2016-03-02 18:01:23,650 INFO
>>>> >>
>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>> >> Stopping ZooKeeperLeaderRetrievalService.
>>>> >> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn
>>>> >> - EventThread shut down
>>>> >> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper
>>>> >> - Session: 0x25229757cff035b closed
>>>> >> 2016-03-02 18:01:23,664 INFO
>>>> >> org.apache.flink.runtime.taskmanager.TaskManager              -
>>>> TaskManager
>>>> >> will use hostname/address 'task.manager.eth1.hostname.com'
>>>> (10.120.193.110)
>>>> >> for communication.
>>>> >> ---
>>>> >> Following the new jobmanager is discovered and the taskmanager is
>>>> able to
>>>> >> register at the jobmanager using eth1. The problem is that
>>>> connections TO
>>>> >> eth1 are not possible. So flink should always use eth0. The
>>>> exception we
>>>> >> later see is:
>>>> >> ---
>>>> >> java.io.IOException: Connecting the channel failed: Connecting to
>>>> remote
>>>> >> task manager + 'other.task.manager.eth1.hostname/
>>>> 10.120.193.111:46620' has
>>>> >> failed. This might indicate that the remote task manager has been
>>>> lost.
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
>>>> >> at
>>>> >>
>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
>>>> >> at
>>>> >>
>>>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>>>> >> at
>>>> >>
>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>>>> >> at
>>>> >>
>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>>> >> at
>>>> >>
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> >> at java.lang.Thread.run(Thread.java:744)
>>>> >> ---
>>>> >> The root cause seems to be that network interface selection is still
>>>> using
>>>> >> the old jobmanager location and hence is not able to choose the right
>>>> >> interface. In particular, it seems that iteration order over the
>>>> network
>>>> >> interfaces differs between the HEURISTIC and SLOW strategy, which
>>>> then leads
>>>> >> to the wrong interface being selected.
>>>> >>
>>>> >> Cheers,
>>>> >>  Max
>>>> >> —
>>>> >> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>>> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> >> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>> >>
>>>>
>>>
>>>
>>
>

Reply via email to