Okay, that is a change from the original behavior, introduced in HA. Originally, if the connection attempts failed, it always returned the InetAddress.getLocalHost() interface. I think we should change it back to that, because that interface is by far the best possible heuristic.
On Thu, Mar 3, 2016 at 11:39 AM, Till Rohrmann <trohrm...@apache.org> wrote: > If I’m not mistaken, then it’s not necessarily true that the heuristic > returns InetAddress.getLocalHost() in all cases. The heuristic will > select the first network interface with the afore-mentioned conditions but > before returning it, it will try a last time to connect to the JM via the > interface bound to InetAddress.getLocalHost(). However, if this fails, > then the heuristically selected network interface will be returned. > > > On Thu, Mar 3, 2016 at 10:49 AM, Stephan Ewen <se...@apache.org> wrote: > >> If the ThasManager cannot connect to the JobManager, it will use the >> interface that is bound to the machine's host name >> ("InetAddress.getLocalHost()"). >> >> So, the best way to fix this would be to make sure that all machines have >> a proper network configuration. Then Flink would either use an address that >> can connect (via trying various interfaces), or it would default back to >> the hostname/interface that is configured on the machine. >> >> >> On Thu, Mar 3, 2016 at 10:43 AM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Max, >>> >>> the problem is that before starting the TM, we have to find the network >>> interface which is reachable by the other machines. So what we do is to >>> connect to the current JobManager. If it should happen, as in your case, >>> that the JobManager just died and the new JM address has not been written >>> to ZooKeeper, then the TMs don’t have much choice other than using the >>> heuristic. >>> >>> I can’t really tell why eth1 is chosen over eth0. The condition is that >>> the interface address is an Inet4Address, no link local address as well >>> as not a loopback address. >>> >>> Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be the >>> easiest solution to solve your problem. I’ve checked the default value is >>> set to 10 s which might be a bit too low for restarting a new JM and >>> publishing its address via ZooKeeper. >>> >>> Cheers, >>> Till >>> >>> >>> On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <u...@apache.org> wrote: >>> >>>> I had an offline chat with Till about this. He pointed out that the >>>> address is chosen once at start up time (while not being able to >>>> connect to the old job manager) and then it stays fixed at eth1. >>>> >>>> You can increase the lookup timeout by setting akka.lookup.timeout to >>>> a higher value (like 100 s). This is the only workaroud I'm aware of >>>> at this point. Maybe Till can chime in here whether this has other >>>> implications as well? >>>> >>>> – Ufuk >>>> >>>> On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <u...@apache.org> wrote: >>>> > Hey Max! >>>> > >>>> > for the first WARN in >>>> > org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is >>>> > expected if the new leader has not updated ZooKeeper yet. The >>>> > important thing is that the new leading job manager is eventually >>>> > retrieved. This did happen, right? >>>> > >>>> > Regarding eth1 vs. eth0: After the new job manager becomes leader, the >>>> > task manager should re-try connecting to it with the same strategy as >>>> > in the initial connection establishment (e.g. try SLOW first and only >>>> > fall back to HEURISTIC). Can you see in the logs whether this happens? >>>> > >>>> > The best thing would be to share the complete logs. Is this possible? >>>> > If not publicly, feel free to send it to me privately (uce at apache >>>> > org). >>>> > >>>> > – Ufuk >>>> > >>>> > >>>> > On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode >>>> > <maximilian.b...@tngtech.com> wrote: >>>> >> Hi everyone, >>>> >> >>>> >> we are trying to get to work JobManager HA in the context of a >>>> per-job YARN >>>> >> session using the 1.0.0-rc3 from a few days ago and are having a >>>> problem >>>> >> concerning task managers with several network interfaces. >>>> >> >>>> >> After manually killing the job manager process, the jobmanager.log >>>> on the >>>> >> newly allocated second job manager reads: >>>> >> --- >>>> >> 2016-03-02 18:01:09,635 WARN Remoting >>>> >> - Tried to associate with unreachable remote address >>>> >> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for >>>> 5000 ms, >>>> >> all messages to this address will be delivered to dead letters. >>>> Reason: >>>> >> Connection refused: /10.127.68.136:34811 >>>> >> 2016-03-02 18:01:09,644 WARN >>>> >> org.apache.flink.runtime.webmonitor.JobManagerRetriever - >>>> Failed to >>>> >> retrieve leader gateway and port. >>>> >> akka.actor.ActorNotFound: Actor not found for: >>>> >> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/), >>>> >> Path(/user/jobmanager)] >>>> >> at >>>> >> >>>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) >>>> >> at >>>> >> >>>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) >>>> >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >>>> >> at >>>> >> >>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) >>>> >> at >>>> >> >>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) >>>> >> at >>>> >> >>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) >>>> >> at >>>> >> >>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) >>>> >> at >>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >>>> >> at >>>> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) >>>> >> at >>>> >> >>>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) >>>> >> at >>>> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) >>>> >> at >>>> >> >>>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) >>>> >> at >>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) >>>> >> at >>>> >> >>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) >>>> >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) >>>> >> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) >>>> >> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) >>>> >> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) >>>> >> at >>>> >> >>>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) >>>> >> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561) >>>> >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) >>>> >> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415) >>>> >> at >>>> >> >>>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) >>>> >> at >>>> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) >>>> >> at akka.actor.ActorCell.terminate(ActorCell.scala:369) >>>> >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) >>>> >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) >>>> >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) >>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>>> >> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> >> at >>>> >> >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> >> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> >> at >>>> >> >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> >> --- >>>> >> where the IP not found is from the old job manager. So far, is this >>>> the >>>> >> expected behavior? >>>> >> >>>> >> The problem then arises on a new task manager, which also tries to >>>> connect >>>> >> to the old job manager unsuccessfully. The >>>> ZooKeeperLeaderRetrievalService >>>> >> starts cycling through the available network interfaces, as can be >>>> seen in >>>> >> the relevant taskmanager.log: >>>> >> --- >>>> >> 2016-03-02 18:01:13,636 INFO >>>> >> >>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>> >> Starting ZooKeeperLeaderRetrievalService. >>>> >> 2016-03-02 18:01:13,646 INFO >>>> >> org.apache.flink.runtime.util.LeaderRetrievalUtils - >>>> Trying to >>>> >> select the network interface and address to use by connecting to the >>>> leading >>>> >> JobManager. >>>> >> 2016-03-02 18:01:13,646 INFO >>>> >> org.apache.flink.runtime.util.LeaderRetrievalUtils - >>>> TaskManager >>>> >> will try to connect for 10000 milliseconds before falling back to >>>> heuristics >>>> >> 2016-03-02 18:01:13,712 INFO >>>> org.apache.flink.runtime.net.ConnectionUtils >>>> >> - Retrieved new target address /10.127.68.136:34811. >>>> >> 2016-03-02 18:01:14,079 INFO >>>> org.apache.flink.runtime.net.ConnectionUtils >>>> >> - Trying to connect to address /10.127.68.136:34811 >>>> >> 2016-03-02 18:01:14,082 INFO >>>> org.apache.flink.runtime.net.ConnectionUtils >>>> >> - Failed to connect from address >>>> >> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused >>>> >> 2016-03-02 18:01:14,082 INFO >>>> org.apache.flink.runtime.net.ConnectionUtils >>>> >> - Failed to connect from address '/10.127.68.136': Connection >>>> refused >>>> >> 2016-03-02 18:01:14,082 INFO >>>> org.apache.flink.runtime.net.ConnectionUtils >>>> >> - Failed to connect from address '/10.120.193.110': Connection >>>> refused >>>> >> 2016-03-02 18:01:14,082 INFO >>>> org.apache.flink.runtime.net.ConnectionUtils >>>> >> - Failed to connect from address '/10.127.68.136': Connection >>>> refused >>>> >> 2016-03-02 18:01:14,083 INFO >>>> org.apache.flink.runtime.net.ConnectionUtils >>>> >> - Failed to connect from address '/127.0.0.1': Connection refused >>>> >> 2016-03-02 18:01:14,083 INFO >>>> org.apache.flink.runtime.net.ConnectionUtils >>>> >> - Failed to connect from address '/10.120.193.110': Connection >>>> refused >>>> >> 2016-03-02 18:01:14,083 INFO >>>> org.apache.flink.runtime.net.ConnectionUtils >>>> >> - Failed to connect from address '/10.127.68.136': Connection >>>> refused >>>> >> 2016-03-02 18:01:14,083 INFO >>>> org.apache.flink.runtime.net.ConnectionUtils >>>> >> - Failed to connect from address '/127.0.0.1': Connection refused >>>> >> --- >>>> >> After five repetitions, the task manager stops trying to retrieve >>>> the leader >>>> >> and using the HEURISTIC strategy ends up using eth1 >>>> (10.120.193.110) from >>>> >> now on: >>>> >> --- >>>> >> 2016-03-02 18:01:23,650 INFO >>>> >> >>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>> >> Stopping ZooKeeperLeaderRetrievalService. >>>> >> 2016-03-02 18:01:23,655 INFO org.apache.zookeeper.ClientCnxn >>>> >> - EventThread shut down >>>> >> 2016-03-02 18:01:23,655 INFO org.apache.zookeeper.ZooKeeper >>>> >> - Session: 0x25229757cff035b closed >>>> >> 2016-03-02 18:01:23,664 INFO >>>> >> org.apache.flink.runtime.taskmanager.TaskManager - >>>> TaskManager >>>> >> will use hostname/address 'task.manager.eth1.hostname.com' >>>> (10.120.193.110) >>>> >> for communication. >>>> >> --- >>>> >> Following the new jobmanager is discovered and the taskmanager is >>>> able to >>>> >> register at the jobmanager using eth1. The problem is that >>>> connections TO >>>> >> eth1 are not possible. So flink should always use eth0. The >>>> exception we >>>> >> later see is: >>>> >> --- >>>> >> java.io.IOException: Connecting the channel failed: Connecting to >>>> remote >>>> >> task manager + 'other.task.manager.eth1.hostname/ >>>> 10.120.193.111:46620' has >>>> >> failed. This might indicate that the remote task manager has been >>>> lost. >>>> >> at >>>> >> >>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388) >>>> >> at >>>> >> >>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411) >>>> >> at >>>> >> >>>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108) >>>> >> at >>>> >> >>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175) >>>> >> at >>>> >> >>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) >>>> >> at >>>> >> >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) >>>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> >> at java.lang.Thread.run(Thread.java:744) >>>> >> --- >>>> >> The root cause seems to be that network interface selection is still >>>> using >>>> >> the old jobmanager location and hence is not able to choose the right >>>> >> interface. In particular, it seems that iteration order over the >>>> network >>>> >> interfaces differs between the HEURISTIC and SLOW strategy, which >>>> then leads >>>> >> to the wrong interface being selected. >>>> >> >>>> >> Cheers, >>>> >> Max >>>> >> — >>>> >> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com >>>> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>> >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>> >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>> >> >>>> >>> >>> >> >