Hi Alexis,

did you set the Zookeeper configuration for Flink in Zeppelin?

On Mon, Mar 20, 2017 at 11:37 AM, Alexis Gendronneau <
a.gendronn...@gmail.com> wrote:

> Hello users,
>
> As Maciek, I'm currently trying to make apache Zeppelin 0.7 working with
> Flink. I have two versions of flink available (1.1.2 and 1.2.0). Each one
> is running in High-availability mode.
>
> When running jobs from Zeppelin in Flink local mode, everything works
> fine. But when trying to submit job to remote host (no matter which version
> involved), job is stuck in submitting phase until it reaches
> akka.client.timeout.
>
> I tried to increase timeout (like said in error raised in zeppelin), but
> it only increase time before error is finally raised (tested with 600s).
>
> On Flink side, nothing appears but :
>
>     2017-03-20 11:19:31,675 WARN  
> org.apache.flink.runtime.jobmanager.JobManager
> - Discard message LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
> 8af3a91762a171b04c4be0efe540f3d4) ,EXECUTION_RESULT_AND_STATE_CHANGES))
>     because the expected leader session ID 
> Some(f955760c-d80d-4992-a148-5968026ca6e4)
> did not equal the received leader session ID None.
>
>
> On zepplin interpreter side, we get following stacktrace :
>
>     bestCarrier: org.apache.flink.api.scala.DataSet[CarrierFlightsCount]
> =     org.apache.flink.api.scala.DataSet@669fc812
>     org.apache.flink.client.program.ProgramInvocationException: The
> program     execution failed: Communication with JobManager failed: Job
> submission to the     JobManager timed out. You may increase
> 'akka.client.timeout' in case the     JobManager needs more time to
> configure and confirm the job submission.
>       at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:409)
>       at org.apache.flink.client.program.StandaloneClusterClient.
> submitJob(StandaloneClusterClient.java:95)
>       at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:382)
>       at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:369)
>       at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:344)
>       at org.apache.flink.client.RemoteExecutor.executePlanWithJars(
> RemoteExecutor.java:211)
>       at org.apache.flink.client.RemoteExecutor.executePlan(
> RemoteExecutor.java:188)
>       at org.apache.flink.api.java.RemoteEnvironment.execute(
> RemoteEnvironment.java:172)
>       at org.apache.flink.api.java.ExecutionEnvironment.execute(
> ExecutionEnvironment.java:896)
>       at org.apache.flink.api.scala.ExecutionEnvironment.execute(
> ExecutionEnvironment.scala:637)
>       at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
>       ... 36 elided
>     Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Communication with JobManager failed: Job submission to the JobManager
> timed out. You may increase 'akka.client.timeout' in case the JobManager
> needs more time to configure and confirm the job submission.
>       at org.apache.flink.runtime.client.JobClient.
> submitJobAndWait(JobClient.java:137)
>       at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:405)
>       ... 46 more
>     Caused by: org.apache.flink.runtime.client.
> JobClientActorSubmissionTimeoutException: Job submission to the
> JobManager timed out. You may increase 'akka.client.timeout' in case the
> JobManager needs more time to configure and confirm the job submission.
>       at org.apache.flink.runtime.client.JobClientActor.
> handleMessage(JobClientActor.java:264)
>       at org.apache.flink.runtime.akka.FlinkUntypedActor.
> handleLeaderSessionID(FlinkUntypedActor.java:90)
>       at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(
> FlinkUntypedActor.java:70)
>       at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
> UntypedActor.scala:167)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>       at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>       at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> pollAndExecAll(ForkJoinPool.java:1253)
>       at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1346)
>       at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>       at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> It looks like we have to add parameters on zepplin side, but I cant see
> whats missing here. Any clue appreciated.
>
> Regards,
>
> 2017-01-24 17:13 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>
>> +Till Rohrmann <trohrm...@apache.org>, do you know what can be used to
>> access a HA cluster from that setting.
>>
>> Adding Till since he probably knows the HA stuff best.
>>
>> On Sun, 22 Jan 2017 at 15:58 Maciek Próchniak <m...@touk.pl> wrote:
>>
>>> Hi,
>>>
>>> I have standalone Flink cluster configured with HA setting (i.e. with
>>> zookeeper recovery). How should I access it remotely, e.g. with Zeppelin
>>> notebook or scala shell?
>>>
>>> There are settings for host/port, but with HA setting they are not fixed
>>> - if I check which is *current leader* host and port and set that I get
>>> exception on job manager:
>>>
>>> 20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN
>>> o.a.f.runtime.jobmanager.JobManager - Discard message
>>> LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
>>> 02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHANGES))
>>> because the expected leader session ID
>>> Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received
>>> leader session ID None.
>>>
>>> - I guess it's reasonable behaviour, since I should use appropriate
>>> LeaderRetrievalService and so on. But apparently there's no such
>>> possibility in scala flink shell?
>>>
>>> Is it missing feature? I can prepare patch, but I'm not sure how would I
>>> hook behaviour of ClusterClient into FlinkILoop?
>>>
>>> thanks,
>>>
>>> maciek
>>>
>>>
>
>
> --
> Alexis Gendronneau
>
> alexis.gendronn...@corp.ovh.com
> a.gendronn...@gmail.com
>

Reply via email to