Hi Robert, Hi Till, I tried to setup high-availibility options in zepplin, but i guess it's just a matter of flink version compatibility on zepplin side. I'll try to compile zepplin with 1.2 and add needed parameter to see if its better. Thanks for your help !
2017-03-27 15:09 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: > Hi Maciek and Alexis, > > as far as I can tell, I think it is currently not possible to use Zeppelin > with a Flink cluster running in HA mode. In order to make it work, it would > be necessary to specify either a Flink configuration for the Flink > interpreter (this is probably the most general solution) or to enable the > HA mode in Zeppelin. Enabling the HA mode would mean that we set > high-availability: > zookeeper in the configuration and then set all the remaining > high-availability configuration options [1] to the same values with which > the Flink cluster was started. This would have to be contributed to the > Zeppelin project. > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.3/setup/config.html#high-availability-ha > > Cheers, > Till > > > On Thu, Mar 23, 2017 at 11:41 AM, Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Alexis, >> >> did you set the Zookeeper configuration for Flink in Zeppelin? >> >> On Mon, Mar 20, 2017 at 11:37 AM, Alexis Gendronneau < >> a.gendronn...@gmail.com> wrote: >> >>> Hello users, >>> >>> As Maciek, I'm currently trying to make apache Zeppelin 0.7 working with >>> Flink. I have two versions of flink available (1.1.2 and 1.2.0). Each one >>> is running in High-availability mode. >>> >>> When running jobs from Zeppelin in Flink local mode, everything works >>> fine. But when trying to submit job to remote host (no matter which version >>> involved), job is stuck in submitting phase until it reaches >>> akka.client.timeout. >>> >>> I tried to increase timeout (like said in error raised in zeppelin), but >>> it only increase time before error is finally raised (tested with 600s). >>> >>> On Flink side, nothing appears but : >>> >>> 2017-03-20 11:19:31,675 WARN >>> org.apache.flink.runtime.jobmanager.JobManager >>> - Discard message LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: >>> 8af3a91762a171b04c4be0efe540f3d4) ,EXECUTION_RESULT_AND_STATE_CHANGES)) >>> because the expected leader session ID >>> Some(f955760c-d80d-4992-a148-5968026ca6e4) >>> did not equal the received leader session ID None. >>> >>> >>> On zepplin interpreter side, we get following stacktrace : >>> >>> bestCarrier: org.apache.flink.api.scala.DataSet[CarrierFlightsCount] >>> = org.apache.flink.api.scala.DataSet@669fc812 >>> org.apache.flink.client.program.ProgramInvocationException: The >>> program execution failed: Communication with JobManager failed: Job >>> submission to the JobManager timed out. You may increase >>> 'akka.client.timeout' in case the JobManager needs more time to >>> configure and confirm the job submission. >>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >>> ent.java:409) >>> at org.apache.flink.client.program.StandaloneClusterClient.subm >>> itJob(StandaloneClusterClient.java:95) >>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >>> ent.java:382) >>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >>> ent.java:369) >>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >>> ent.java:344) >>> at org.apache.flink.client.RemoteExecutor.executePlanWithJars(R >>> emoteExecutor.java:211) >>> at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExe >>> cutor.java:188) >>> at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEn >>> vironment.java:172) >>> at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu >>> tionEnvironment.java:896) >>> at org.apache.flink.api.scala.ExecutionEnvironment.execute(Exec >>> utionEnvironment.scala:637) >>> at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) >>> ... 36 elided >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>> Communication with JobManager failed: Job submission to the JobManager >>> timed out. You may increase 'akka.client.timeout' in case the JobManager >>> needs more time to configure and confirm the job submission. >>> at org.apache.flink.runtime.client.JobClient.submitJobAndWait(J >>> obClient.java:137) >>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >>> ent.java:405) >>> ... 46 more >>> Caused by: org.apache.flink.runtime.clien >>> t.JobClientActorSubmissionTimeoutException: Job submission to the >>> JobManager timed out. You may increase 'akka.client.timeout' in case the >>> JobManager needs more time to configure and confirm the job submission. >>> at org.apache.flink.runtime.client.JobClientActor.handleMessage >>> (JobClientActor.java:264) >>> at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeader >>> SessionID(FlinkUntypedActor.java:90) >>> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(Fl >>> inkUntypedActor.java:70) >>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(Untyp >>> edActor.scala:167) >>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.j >>> ava:260) >>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExec >>> All(ForkJoinPool.java:1253) >>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For >>> kJoinPool.java:1346) >>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo >>> l.java:1979) >>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW >>> orkerThread.java:107) >>> >>> It looks like we have to add parameters on zepplin side, but I cant see >>> whats missing here. Any clue appreciated. >>> >>> Regards, >>> >>> 2017-01-24 17:13 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: >>> >>>> +Till Rohrmann <trohrm...@apache.org>, do you know what can be used to >>>> access a HA cluster from that setting. >>>> >>>> Adding Till since he probably knows the HA stuff best. >>>> >>>> On Sun, 22 Jan 2017 at 15:58 Maciek Próchniak <m...@touk.pl> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have standalone Flink cluster configured with HA setting (i.e. with >>>>> zookeeper recovery). How should I access it remotely, e.g. with >>>>> Zeppelin >>>>> notebook or scala shell? >>>>> >>>>> There are settings for host/port, but with HA setting they are not >>>>> fixed >>>>> - if I check which is *current leader* host and port and set that I get >>>>> exception on job manager: >>>>> >>>>> 20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN >>>>> o.a.f.runtime.jobmanager.JobManager - Discard message >>>>> LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: >>>>> 02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHANGES)) >>>>> because the expected leader session ID >>>>> Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received >>>>> leader session ID None. >>>>> >>>>> - I guess it's reasonable behaviour, since I should use appropriate >>>>> LeaderRetrievalService and so on. But apparently there's no such >>>>> possibility in scala flink shell? >>>>> >>>>> Is it missing feature? I can prepare patch, but I'm not sure how would >>>>> I >>>>> hook behaviour of ClusterClient into FlinkILoop? >>>>> >>>>> thanks, >>>>> >>>>> maciek >>>>> >>>>> >>> >>> >>> -- >>> Alexis Gendronneau >>> >>> alexis.gendronn...@corp.ovh.com >>> a.gendronn...@gmail.com >>> >> >> > -- Alexis Gendronneau alexis.gendronn...@corp.ovh.com a.gendronn...@gmail.com