I've created an issue [1] and opened a PR [2] to fix the issue. [1] https://issues.apache.org/jira/browse/FLINK-3570 [2] https://github.com/apache/flink/pull/1758
Cheers, Till On Thu, Mar 3, 2016 at 12:33 PM, Maximilian Bode < maximilian.b...@tngtech.com> wrote: > Hi Ufuk, Till and Stephan, > > Yes, that is what we observed. The primary hostname, i.e. the one returned > by the unix hostname command, is in fact bound to the eth0 interface, > whereas Flink uses the eth1 interface (pertaining to another hostname). > > Changing akka.lookup.timeout to 100 s seems to fix the problem as now the > new job manager is available in sufficient time. I still would agree with > Stephan on taking the local hostname being the preferred strategy. > > Cheers, > Max > — > Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > Am 03.03.2016 um 12:29 schrieb Till Rohrmann <till.rohrm...@gmail.com>: > > No I don't think this behaviour has been introduced by HA. That is the > default behaviour we used for a long time. If you think we should still > change it, then I can open an issue for it. > > On Thu, Mar 3, 2016 at 12:20 PM, Stephan Ewen <se...@apache.org> wrote: > >> Okay, that is a change from the original behavior, introduced in HA. >> Originally, if the connection attempts failed, it always returned the >> InetAddress.getLocalHost() >> interface. >> I think we should change it back to that, because that interface is by >> far the best possible heuristic. >> >> On Thu, Mar 3, 2016 at 11:39 AM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> If I’m not mistaken, then it’s not necessarily true that the heuristic >>> returns InetAddress.getLocalHost() in all cases. The heuristic will >>> select the first network interface with the afore-mentioned conditions but >>> before returning it, it will try a last time to connect to the JM via the >>> interface bound to InetAddress.getLocalHost(). However, if this fails, >>> then the heuristically selected network interface will be returned. >>> >>> >>> On Thu, Mar 3, 2016 at 10:49 AM, Stephan Ewen <se...@apache.org> wrote: >>> >>>> If the ThasManager cannot connect to the JobManager, it will use the >>>> interface that is bound to the machine's host name >>>> ("InetAddress.getLocalHost()"). >>>> >>>> So, the best way to fix this would be to make sure that all machines >>>> have a proper network configuration. Then Flink would either use an address >>>> that can connect (via trying various interfaces), or it would default back >>>> to the hostname/interface that is configured on the machine. >>>> >>>> >>>> On Thu, Mar 3, 2016 at 10:43 AM, Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Hi Max, >>>>> >>>>> the problem is that before starting the TM, we have to find the >>>>> network interface which is reachable by the other machines. So what we do >>>>> is to connect to the current JobManager. If it should happen, as in your >>>>> case, that the JobManager just died and the new JM address has not been >>>>> written to ZooKeeper, then the TMs don’t have much choice other than using >>>>> the heuristic. >>>>> >>>>> I can’t really tell why eth1 is chosen over eth0. The condition is >>>>> that the interface address is an Inet4Address, no link local address >>>>> as well as not a loopback address. >>>>> >>>>> Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be >>>>> the easiest solution to solve your problem. I’ve checked the default value >>>>> is set to 10 s which might be a bit too low for restarting a new JM >>>>> and publishing its address via ZooKeeper. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> >>>>> On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <u...@apache.org> wrote: >>>>> >>>>>> I had an offline chat with Till about this. He pointed out that the >>>>>> address is chosen once at start up time (while not being able to >>>>>> connect to the old job manager) and then it stays fixed at eth1. >>>>>> >>>>>> You can increase the lookup timeout by setting akka.lookup.timeout to >>>>>> a higher value (like 100 s). This is the only workaroud I'm aware of >>>>>> at this point. Maybe Till can chime in here whether this has other >>>>>> implications as well? >>>>>> >>>>>> – Ufuk >>>>>> >>>>>> On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <u...@apache.org> wrote: >>>>>> > Hey Max! >>>>>> > >>>>>> > for the first WARN in >>>>>> > org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is >>>>>> > expected if the new leader has not updated ZooKeeper yet. The >>>>>> > important thing is that the new leading job manager is eventually >>>>>> > retrieved. This did happen, right? >>>>>> > >>>>>> > Regarding eth1 vs. eth0: After the new job manager becomes leader, >>>>>> the >>>>>> > task manager should re-try connecting to it with the same strategy >>>>>> as >>>>>> > in the initial connection establishment (e.g. try SLOW first and >>>>>> only >>>>>> > fall back to HEURISTIC). Can you see in the logs whether this >>>>>> happens? >>>>>> > >>>>>> > The best thing would be to share the complete logs. Is this >>>>>> possible? >>>>>> > If not publicly, feel free to send it to me privately (uce at apache >>>>>> > org). >>>>>> > >>>>>> > – Ufuk >>>>>> > >>>>>> > >>>>>> > On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode >>>>>> > <maximilian.b...@tngtech.com> wrote: >>>>>> >> Hi everyone, >>>>>> >> >>>>>> >> we are trying to get to work JobManager HA in the context of a >>>>>> per-job YARN >>>>>> >> session using the 1.0.0-rc3 from a few days ago and are having a >>>>>> problem >>>>>> >> concerning task managers with several network interfaces. >>>>>> >> >>>>>> >> After manually killing the job manager process, the jobmanager.log >>>>>> on the >>>>>> >> newly allocated second job manager reads: >>>>>> >> --- >>>>>> >> 2016-03-02 18:01:09,635 WARN Remoting >>>>>> >> - Tried to associate with unreachable remote address >>>>>> >> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for >>>>>> 5000 ms, >>>>>> >> all messages to this address will be delivered to dead letters. >>>>>> Reason: >>>>>> >> Connection refused: /10.127.68.136:34811 >>>>>> >> 2016-03-02 18:01:09,644 WARN >>>>>> >> org.apache.flink.runtime.webmonitor.JobManagerRetriever - >>>>>> Failed to >>>>>> >> retrieve leader gateway and port. >>>>>> >> akka.actor.ActorNotFound: Actor not found for: >>>>>> >> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/), >>>>>> >> Path(/user/jobmanager)] >>>>>> >> at >>>>>> >> >>>>>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) >>>>>> >> at >>>>>> >> >>>>>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) >>>>>> >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >>>>>> >> at >>>>>> >> >>>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) >>>>>> >> at >>>>>> >> >>>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) >>>>>> >> at >>>>>> >> >>>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) >>>>>> >> at >>>>>> >> >>>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) >>>>>> >> at >>>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >>>>>> >> at >>>>>> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) >>>>>> >> at >>>>>> >> >>>>>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) >>>>>> >> at >>>>>> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) >>>>>> >> at >>>>>> >> >>>>>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) >>>>>> >> at >>>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) >>>>>> >> at >>>>>> >> >>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) >>>>>> >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) >>>>>> >> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) >>>>>> >> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) >>>>>> >> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) >>>>>> >> at >>>>>> >> >>>>>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) >>>>>> >> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561) >>>>>> >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) >>>>>> >> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415) >>>>>> >> at >>>>>> >> >>>>>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) >>>>>> >> at >>>>>> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) >>>>>> >> at akka.actor.ActorCell.terminate(ActorCell.scala:369) >>>>>> >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) >>>>>> >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) >>>>>> >> at >>>>>> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) >>>>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>>>>> >> at >>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> >> at >>>>>> >> >>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>> >> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> >> at >>>>>> >> >>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>> >> --- >>>>>> >> where the IP not found is from the old job manager. So far, is >>>>>> this the >>>>>> >> expected behavior? >>>>>> >> >>>>>> >> The problem then arises on a new task manager, which also tries to >>>>>> connect >>>>>> >> to the old job manager unsuccessfully. The >>>>>> ZooKeeperLeaderRetrievalService >>>>>> >> starts cycling through the available network interfaces, as can be >>>>>> seen in >>>>>> >> the relevant taskmanager.log: >>>>>> >> --- >>>>>> >> 2016-03-02 18:01:13,636 INFO >>>>>> >> >>>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService >>>>>> - >>>>>> >> Starting ZooKeeperLeaderRetrievalService. >>>>>> >> 2016-03-02 18:01:13,646 INFO >>>>>> >> org.apache.flink.runtime.util.LeaderRetrievalUtils - >>>>>> Trying to >>>>>> >> select the network interface and address to use by connecting to >>>>>> the leading >>>>>> >> JobManager. >>>>>> >> 2016-03-02 18:01:13,646 INFO >>>>>> >> org.apache.flink.runtime.util.LeaderRetrievalUtils - >>>>>> TaskManager >>>>>> >> will try to connect for 10000 milliseconds before falling back to >>>>>> heuristics >>>>>> >> 2016-03-02 18:01:13,712 INFO >>>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>>> >> - Retrieved new target address /10.127.68.136:34811. >>>>>> >> 2016-03-02 18:01:14,079 INFO >>>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>>> >> - Trying to connect to address /10.127.68.136:34811 >>>>>> >> 2016-03-02 18:01:14,082 INFO >>>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>>> >> - Failed to connect from address >>>>>> >> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused >>>>>> >> 2016-03-02 18:01:14,082 INFO >>>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>>> >> - Failed to connect from address '/10.127.68.136': Connection >>>>>> refused >>>>>> >> 2016-03-02 18:01:14,082 INFO >>>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>>> >> - Failed to connect from address '/10.120.193.110': Connection >>>>>> refused >>>>>> >> 2016-03-02 18:01:14,082 INFO >>>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>>> >> - Failed to connect from address '/10.127.68.136': Connection >>>>>> refused >>>>>> >> 2016-03-02 18:01:14,083 INFO >>>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>>> >> - Failed to connect from address '/127.0.0.1': Connection refused >>>>>> >> 2016-03-02 18:01:14,083 INFO >>>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>>> >> - Failed to connect from address '/10.120.193.110': Connection >>>>>> refused >>>>>> >> 2016-03-02 18:01:14,083 INFO >>>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>>> >> - Failed to connect from address '/10.127.68.136': Connection >>>>>> refused >>>>>> >> 2016-03-02 18:01:14,083 INFO >>>>>> org.apache.flink.runtime.net.ConnectionUtils >>>>>> >> - Failed to connect from address '/127.0.0.1': Connection refused >>>>>> >> --- >>>>>> >> After five repetitions, the task manager stops trying to retrieve >>>>>> the leader >>>>>> >> and using the HEURISTIC strategy ends up using eth1 >>>>>> (10.120.193.110) from >>>>>> >> now on: >>>>>> >> --- >>>>>> >> 2016-03-02 18:01:23,650 INFO >>>>>> >> >>>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService >>>>>> - >>>>>> >> Stopping ZooKeeperLeaderRetrievalService. >>>>>> >> 2016-03-02 18:01:23,655 INFO org.apache.zookeeper.ClientCnxn >>>>>> >> - EventThread shut down >>>>>> >> 2016-03-02 18:01:23,655 INFO org.apache.zookeeper.ZooKeeper >>>>>> >> - Session: 0x25229757cff035b closed >>>>>> >> 2016-03-02 18:01:23,664 INFO >>>>>> >> org.apache.flink.runtime.taskmanager.TaskManager - >>>>>> TaskManager >>>>>> >> will use hostname/address 'task.manager.eth1.hostname.com' >>>>>> (10.120.193.110) >>>>>> >> for communication. >>>>>> >> --- >>>>>> >> Following the new jobmanager is discovered and the taskmanager is >>>>>> able to >>>>>> >> register at the jobmanager using eth1. The problem is that >>>>>> connections TO >>>>>> >> eth1 are not possible. So flink should always use eth0. The >>>>>> exception we >>>>>> >> later see is: >>>>>> >> --- >>>>>> >> java.io.IOException: Connecting the channel failed: Connecting to >>>>>> remote >>>>>> >> task manager + 'other.task.manager.eth1.hostname/ >>>>>> 10.120.193.111:46620' has >>>>>> >> failed. This might indicate that the remote task manager has been >>>>>> lost. >>>>>> >> at >>>>>> >> >>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196) >>>>>> >> at >>>>>> >> >>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131) >>>>>> >> at >>>>>> >> >>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83) >>>>>> >> at >>>>>> >> >>>>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60) >>>>>> >> at >>>>>> >> >>>>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115) >>>>>> >> at >>>>>> >> >>>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388) >>>>>> >> at >>>>>> >> >>>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411) >>>>>> >> at >>>>>> >> >>>>>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108) >>>>>> >> at >>>>>> >> >>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175) >>>>>> >> at >>>>>> >> >>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) >>>>>> >> at >>>>>> >> >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) >>>>>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>> >> at java.lang.Thread.run(Thread.java:744) >>>>>> >> --- >>>>>> >> The root cause seems to be that network interface selection is >>>>>> still using >>>>>> >> the old jobmanager location and hence is not able to choose the >>>>>> right >>>>>> >> interface. In particular, it seems that iteration order over the >>>>>> network >>>>>> >> interfaces differs between the HEURISTIC and SLOW strategy, which >>>>>> then leads >>>>>> >> to the wrong interface being selected. >>>>>> >> >>>>>> >> Cheers, >>>>>> >> Max >>>>>> >> — >>>>>> >> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com >>>>>> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>>> >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>>> >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>>>> >> >>>>>> >>>>> >>>>> >>>> >>> >> > >