Hi Alexis, did you set the Zookeeper configuration for Flink in Zeppelin?
On Mon, Mar 20, 2017 at 11:37 AM, Alexis Gendronneau < a.gendronn...@gmail.com> wrote: > Hello users, > > As Maciek, I'm currently trying to make apache Zeppelin 0.7 working with > Flink. I have two versions of flink available (1.1.2 and 1.2.0). Each one > is running in High-availability mode. > > When running jobs from Zeppelin in Flink local mode, everything works > fine. But when trying to submit job to remote host (no matter which version > involved), job is stuck in submitting phase until it reaches > akka.client.timeout. > > I tried to increase timeout (like said in error raised in zeppelin), but > it only increase time before error is finally raised (tested with 600s). > > On Flink side, nothing appears but : > > 2017-03-20 11:19:31,675 WARN > org.apache.flink.runtime.jobmanager.JobManager > - Discard message LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: > 8af3a91762a171b04c4be0efe540f3d4) ,EXECUTION_RESULT_AND_STATE_CHANGES)) > because the expected leader session ID > Some(f955760c-d80d-4992-a148-5968026ca6e4) > did not equal the received leader session ID None. > > > On zepplin interpreter side, we get following stacktrace : > > bestCarrier: org.apache.flink.api.scala.DataSet[CarrierFlightsCount] > = org.apache.flink.api.scala.DataSet@669fc812 > org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Communication with JobManager failed: Job > submission to the JobManager timed out. You may increase > 'akka.client.timeout' in case the JobManager needs more time to > configure and confirm the job submission. > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:409) > at org.apache.flink.client.program.StandaloneClusterClient. > submitJob(StandaloneClusterClient.java:95) > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:382) > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:369) > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:344) > at org.apache.flink.client.RemoteExecutor.executePlanWithJars( > RemoteExecutor.java:211) > at org.apache.flink.client.RemoteExecutor.executePlan( > RemoteExecutor.java:188) > at org.apache.flink.api.java.RemoteEnvironment.execute( > RemoteEnvironment.java:172) > at org.apache.flink.api.java.ExecutionEnvironment.execute( > ExecutionEnvironment.java:896) > at org.apache.flink.api.scala.ExecutionEnvironment.execute( > ExecutionEnvironment.scala:637) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > ... 36 elided > Caused by: org.apache.flink.runtime.client.JobExecutionException: > Communication with JobManager failed: Job submission to the JobManager > timed out. You may increase 'akka.client.timeout' in case the JobManager > needs more time to configure and confirm the job submission. > at org.apache.flink.runtime.client.JobClient. > submitJobAndWait(JobClient.java:137) > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:405) > ... 46 more > Caused by: org.apache.flink.runtime.client. > JobClientActorSubmissionTimeoutException: Job submission to the > JobManager timed out. You may increase 'akka.client.timeout' in case the > JobManager needs more time to configure and confirm the job submission. > at org.apache.flink.runtime.client.JobClientActor. > handleMessage(JobClientActor.java:264) > at org.apache.flink.runtime.akka.FlinkUntypedActor. > handleLeaderSessionID(FlinkUntypedActor.java:90) > at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive( > FlinkUntypedActor.java:70) > at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse( > UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > pollAndExecAll(ForkJoinPool.java:1253) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1346) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > > It looks like we have to add parameters on zepplin side, but I cant see > whats missing here. Any clue appreciated. > > Regards, > > 2017-01-24 17:13 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > >> +Till Rohrmann <trohrm...@apache.org>, do you know what can be used to >> access a HA cluster from that setting. >> >> Adding Till since he probably knows the HA stuff best. >> >> On Sun, 22 Jan 2017 at 15:58 Maciek Próchniak <m...@touk.pl> wrote: >> >>> Hi, >>> >>> I have standalone Flink cluster configured with HA setting (i.e. with >>> zookeeper recovery). How should I access it remotely, e.g. with Zeppelin >>> notebook or scala shell? >>> >>> There are settings for host/port, but with HA setting they are not fixed >>> - if I check which is *current leader* host and port and set that I get >>> exception on job manager: >>> >>> 20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN >>> o.a.f.runtime.jobmanager.JobManager - Discard message >>> LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: >>> 02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHANGES)) >>> because the expected leader session ID >>> Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received >>> leader session ID None. >>> >>> - I guess it's reasonable behaviour, since I should use appropriate >>> LeaderRetrievalService and so on. But apparently there's no such >>> possibility in scala flink shell? >>> >>> Is it missing feature? I can prepare patch, but I'm not sure how would I >>> hook behaviour of ClusterClient into FlinkILoop? >>> >>> thanks, >>> >>> maciek >>> >>> > > > -- > Alexis Gendronneau > > alexis.gendronn...@corp.ovh.com > a.gendronn...@gmail.com >