Hey Max! for the first WARN in org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is expected if the new leader has not updated ZooKeeper yet. The important thing is that the new leading job manager is eventually retrieved. This did happen, right?
Regarding eth1 vs. eth0: After the new job manager becomes leader, the task manager should re-try connecting to it with the same strategy as in the initial connection establishment (e.g. try SLOW first and only fall back to HEURISTIC). Can you see in the logs whether this happens? The best thing would be to share the complete logs. Is this possible? If not publicly, feel free to send it to me privately (uce at apache org). – Ufuk On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode <maximilian.b...@tngtech.com> wrote: > Hi everyone, > > we are trying to get to work JobManager HA in the context of a per-job YARN > session using the 1.0.0-rc3 from a few days ago and are having a problem > concerning task managers with several network interfaces. > > After manually killing the job manager process, the jobmanager.log on the > newly allocated second job manager reads: > --- > 2016-03-02 18:01:09,635 WARN Remoting > - Tried to associate with unreachable remote address > [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for 5000 ms, > all messages to this address will be delivered to dead letters. Reason: > Connection refused: /10.127.68.136:34811 > 2016-03-02 18:01:09,644 WARN > org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to > retrieve leader gateway and port. > akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/), > Path(/user/jobmanager)] > at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) > at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) > at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) > at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) > at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) > at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) > at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) > at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) > at > akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415) > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) > at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:369) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > --- > where the IP not found is from the old job manager. So far, is this the > expected behavior? > > The problem then arises on a new task manager, which also tries to connect > to the old job manager unsuccessfully. The ZooKeeperLeaderRetrievalService > starts cycling through the available network interfaces, as can be seen in > the relevant taskmanager.log: > --- > 2016-03-02 18:01:13,636 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Starting ZooKeeperLeaderRetrievalService. > 2016-03-02 18:01:13,646 INFO > org.apache.flink.runtime.util.LeaderRetrievalUtils - Trying to > select the network interface and address to use by connecting to the leading > JobManager. > 2016-03-02 18:01:13,646 INFO > org.apache.flink.runtime.util.LeaderRetrievalUtils - TaskManager > will try to connect for 10000 milliseconds before falling back to heuristics > 2016-03-02 18:01:13,712 INFO org.apache.flink.runtime.net.ConnectionUtils > - Retrieved new target address /10.127.68.136:34811. > 2016-03-02 18:01:14,079 INFO org.apache.flink.runtime.net.ConnectionUtils > - Trying to connect to address /10.127.68.136:34811 > 2016-03-02 18:01:14,082 INFO org.apache.flink.runtime.net.ConnectionUtils > - Failed to connect from address > 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused > 2016-03-02 18:01:14,082 INFO org.apache.flink.runtime.net.ConnectionUtils > - Failed to connect from address '/10.127.68.136': Connection refused > 2016-03-02 18:01:14,082 INFO org.apache.flink.runtime.net.ConnectionUtils > - Failed to connect from address '/10.120.193.110': Connection refused > 2016-03-02 18:01:14,082 INFO org.apache.flink.runtime.net.ConnectionUtils > - Failed to connect from address '/10.127.68.136': Connection refused > 2016-03-02 18:01:14,083 INFO org.apache.flink.runtime.net.ConnectionUtils > - Failed to connect from address '/127.0.0.1': Connection refused > 2016-03-02 18:01:14,083 INFO org.apache.flink.runtime.net.ConnectionUtils > - Failed to connect from address '/10.120.193.110': Connection refused > 2016-03-02 18:01:14,083 INFO org.apache.flink.runtime.net.ConnectionUtils > - Failed to connect from address '/10.127.68.136': Connection refused > 2016-03-02 18:01:14,083 INFO org.apache.flink.runtime.net.ConnectionUtils > - Failed to connect from address '/127.0.0.1': Connection refused > --- > After five repetitions, the task manager stops trying to retrieve the leader > and using the HEURISTIC strategy ends up using eth1 (10.120.193.110) from > now on: > --- > 2016-03-02 18:01:23,650 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Stopping ZooKeeperLeaderRetrievalService. > 2016-03-02 18:01:23,655 INFO org.apache.zookeeper.ClientCnxn > - EventThread shut down > 2016-03-02 18:01:23,655 INFO org.apache.zookeeper.ZooKeeper > - Session: 0x25229757cff035b closed > 2016-03-02 18:01:23,664 INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager > will use hostname/address 'task.manager.eth1.hostname.com' (10.120.193.110) > for communication. > --- > Following the new jobmanager is discovered and the taskmanager is able to > register at the jobmanager using eth1. The problem is that connections TO > eth1 are not possible. So flink should always use eth0. The exception we > later see is: > --- > java.io.IOException: Connecting the channel failed: Connecting to remote > task manager + 'other.task.manager.eth1.hostname/10.120.193.111:46620' has > failed. This might indicate that the remote task manager has been lost. > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83) > at > org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:744) > --- > The root cause seems to be that network interface selection is still using > the old jobmanager location and hence is not able to choose the right > interface. In particular, it seems that iteration order over the network > interfaces differs between the HEURISTIC and SLOW strategy, which then leads > to the wrong interface being selected. > > Cheers, > Max > — > Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 >