In HA mode, the host and port information you provide to the Shell should
be simply ignored. So you don't have to retrieve them from the
.yarn-properties file.

Could you maybe run the FlinkInterpreter with debug log level and share the
logs with me? You can also do that privately, if you don't want to share
them on the mailing list.

I haven't tried it myself, but I thought that the Shell also works with an
HA cluster, because it uses the same mechanism as the CLI, for example.
I'll try it out later this day.

Cheers,
Till

On Fri, Apr 15, 2016 at 12:22 AM, Andrea Sella <andrea.se...@radicalbit.io>
wrote:

> Hi Till,
>
> The cluster has started in HA.
> I already patched Flink interpreter to allow passing the Configuration to
> FlinkILoop. Neverthless I have to pass host and port to FlinkILoop, there
> are required from FlinkILoop constructor and I retrieve them from
> .yarn-properties file.
>
> I logged Flink Configuration:
>
> INFO [2016-04-14 17:52:58,141] ({pool-2-thread-2}
> FlinkInterpreter.java[open]:96) - Flink Configuration: {
> recovery.mode=zookeeper, host=yarn,
> yarn-properties=/tmp/.yarn-properties-flink,
> recovery.zookeeper.quorum=slave01:2181,slave02:2181,master:2181,
> recovery.zookeeper.path.root=/flink/recovery}
>
> and I attach some logs:
>
> Error displayed in paragraph of Zeppelin
> <https://gist.github.com/alkagin/612d736da8af9ee111e766b230559bb9>
> JobManager log
> <https://gist.github.com/alkagin/0a0b2670ce77f7d9c0807b1e4ef7239a>
> Interpreter/FlinkILoop log
> <https://gist.github.com/alkagin/23e4cec15904448dd2b400a6a37f7fa7>
>
> I was looking Flink shell and it works similar to the interpreter, do it
> works with HA cluster?
>
> Thank you,
> Andrea
>
>
> 2016-04-14 16:09 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:
>
> > Hi Andrea,
> >
> > have you started the Flink Yarn cluster in HA mode? Then the job manager
> > address is stored in ZooKeeper and you have to tell your FlinkILoop that
> it
> > should retrieve the JobManager address from there. In order to do that
> you
> > have to set conf.setString(ConfigConstants.RECOVERY_MODE,
> > "zookeeper"), conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
> > "address of your zookeeper cluster") and
> > conf.setString(ConfigConstants.ZOOKEEPER_DIR_KEY,
> > "flink dir you've set") where conf is the flink configuration object. The
> > values for the different configuration values must match the values
> > specified in the flink-conf.yaml file. You then give the FlinkILoop the
> > conf object.
> >
> > I’m not sure whether you can specify a custom flink configuration in
> > Zeppelin. I think you can only specify a host and port. So either you
> start
> > you Flink cluster in non-HA mode or you have to patch Zeppelin.
> >
> > Cheers,
> > Till
> > ​
> >
> > On Tue, Apr 12, 2016 at 5:12 PM, Andrea Sella <
> andrea.se...@radicalbit.io>
> > wrote:
> >
> > > Hi,
> > >
> > > I am working to allow Zeppelin's flink interpreter to connect an
> existing
> > > yarn cluster. Yarn cluster has started via yarn-session and flink's
> > version
> > > is 1.0.0.
> > >
> > > My approach is to read host and port from .yarn-properties and pass
> them
> > to
> > > IFlinkLoop.
> > > Now I am facing an issue with Session ID when I submit a paragraph to
> > yarn
> > > cluster.
> > > The yarn cluster throws a warning similar to:
> > >
> > > 2016-04-12 10:14:32,666 WARN  org.apache.flink.yarn.YarnJobManager
> > >                  - Discard message
> > > LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
> > > 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES))
> > > because the expected leader session ID
> > > Some(afc85978-f765-488b-acbb-79c2d7cb89e0) did not equal the received
> > > leader session ID None.
> > >
> > > My Zeppelin's paragraph throws a
> > JobClientActorSubmissionTimeoutException,
> > > maybe is it due to the missing sessionId? Do I need to pass extra
> params
> > to
> > > connect correctly to the yarn cluster or host and port are enough?
> > >
> > > Thanks in advance,
> > > Andrea
> > >
> >
>

Reply via email to