Hi Andrea, I think your problem should be fixed with the PRs [1,2]. I've tested it locally on my yarn cluster and it worked.
[1] https://github.com/apache/flink/pull/1904 [2] https://github.com/apache/flink/pull/1914 Cheers, Till On Tue, Apr 19, 2016 at 2:16 PM, Till Rohrmann <trohrm...@apache.org> wrote: > I think this is another issue you’ve detected. I already spotted some > suspicious code in the yarn deployment section. If I’m not mistaken, then > flink-conf.yaml is read too late and is, thus, not respected. I’ll verify > it and if valid, then I’ll open another issue and fix it. > > Thanks for your patience and thorough reporting. It helps a lot :-) > > Cheers, > Till > > > On Tue, Apr 19, 2016 at 2:12 PM, Andrea Sella <andrea.se...@radicalbit.io> > wrote: > >> No, I tried it via scala-shell as you can see the attachment. >> >> Regards, >> Andrea >> >> 2016-04-19 14:08 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >> >>> Hi Andrea, >>> >>> thanks for testing it. How did you submit the job this time? Via >>> Zeppelin? >>> >>> Cheers, >>> Till >>> >>> On Tue, Apr 19, 2016 at 12:51 PM, Andrea Sella < >>> andrea.se...@radicalbit.io> wrote: >>> >>>> Hi Till, >>>> >>>> I've used your branch fixScalaShell to test the scala-shell with our HA >>>> cluster, it doesn't work. Same error as before >>>> >>>> 2016-04-19 06:40:35,030 WARN org.apache.flink.yarn.YarnJobManager >>>> - Discard message >>>> LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: >>>> aa5b034e10a850d863642a24aab75d9c),EXECUTION_RESULT_AND_STATE_CHANGES)) >>>> because the expected leader session ID >>>> Some(bc706707-2bab-4b82-b7a7-1426dce696a7) did not equal the received >>>> leader session ID None. >>>> >>>> If I submit a simple job, it works. I think it is not a problem of our >>>> environment. >>>> >>>> Cheers, >>>> Andrea >>>> >>>> 2016-04-18 18:41 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >>>> >>>>> Cool, that helps a lot :-) >>>>> >>>>> On Mon, Apr 18, 2016 at 6:06 PM, Andrea Sella < >>>>> andrea.se...@radicalbit.io> wrote: >>>>> >>>>>> Hi Till, >>>>>> >>>>>> Don't worry, I am going to test the PR in our HA environment. >>>>>> >>>>>> Cheers, >>>>>> Andrea >>>>>> >>>>>> >>>>>> 2016-04-18 17:46 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >>>>>> >>>>>>> Hi Andrea, >>>>>>> >>>>>>> sorry I've seen your mail too late. I already fixed the problem and >>>>>>> opened a PR [1] for it. I hope you haven't invested too much time for >>>>>>> it, >>>>>>> yet. >>>>>>> >>>>>>> [1] https://github.com/apache/flink/pull/1904 >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Mon, Apr 18, 2016 at 11:19 AM, Andrea Sella < >>>>>>> andrea.se...@radicalbit.io> wrote: >>>>>>> >>>>>>>> Hi Till, >>>>>>>> Thanks for the support, I will take the issue and starting to work >>>>>>>> on it asap. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Andrea >>>>>>>> >>>>>>>> 2016-04-18 10:32 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >>>>>>>> >>>>>>>>> Hi Andrea, >>>>>>>>> >>>>>>>>> I think the problem is simply that it has not been correctly >>>>>>>>> implemented. I just checked and I think the user configuration is not >>>>>>>>> given >>>>>>>>> to the PlanExecutor which is internally created. I’ve opened an >>>>>>>>> issue for that [1]. >>>>>>>>> >>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-3774 >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Till >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Apr 15, 2016 at 4:58 PM, Andrea Sella < >>>>>>>>> andrea.se...@radicalbit.io> wrote: >>>>>>>>> >>>>>>>>>> Hi Till, >>>>>>>>>> >>>>>>>>>> I've tried the Scala-Shell with our HA cluster, no luck again. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Andrea >>>>>>>>>> >>>>>>>>>> 2016-04-15 14:43 GMT+02:00 Andrea Sella < >>>>>>>>>> andrea.se...@radicalbit.io>: >>>>>>>>>> >>>>>>>>>>> Hi Till, >>>>>>>>>>> >>>>>>>>>>> I am using a branched version of 1.0.1 where I cherry-picked >>>>>>>>>>> FLINK-2935 >>>>>>>>>>> <https://github.com/radicalbit/flink/commit/dfbbb9e48c98b486baf279c396d1bf7de31c1f8c> >>>>>>>>>>> to >>>>>>>>>>> use FlinkILoop with Configuration. My Flink interpreter is here >>>>>>>>>>> <https://github.com/radicalbit/incubator-zeppelin/blob/flink-yarn-interpreter/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java>, >>>>>>>>>>> I've started tweaking just two days ago and as I can see there is a >>>>>>>>>>> Zeppelin issue >>>>>>>>>>> <https://issues.apache.org/jira/browse/ZEPPELIN-664> to provide >>>>>>>>>>> FlinkInterpeter working with Yarn and I need it too. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Andrea >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2016-04-15 14:20 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >>>>>>>>>>> >>>>>>>>>>>> Hi Andrea, >>>>>>>>>>>> >>>>>>>>>>>> which version of Flink are you using with Zeppelin? How do you >>>>>>>>>>>> pass the Flink configuration to the FlinkILoop? Could you maybe >>>>>>>>>>>> show me >>>>>>>>>>>> your version of Zeppelin (code). >>>>>>>>>>>> >>>>>>>>>>>> According to the log, the ScalaShellRemoteEnvironment didn't >>>>>>>>>>>> get the Flink configuration with the HA settings. Therefore, it >>>>>>>>>>>> still tries >>>>>>>>>>>> to connect to the jobmanager specified by the host and port >>>>>>>>>>>> values. The >>>>>>>>>>>> functionality to pass in a Flink configuration object to >>>>>>>>>>>> FlinkILoop has >>>>>>>>>>>> only been merged recently. You might have to switch to the >>>>>>>>>>>> 1.1-SNAPSHOT >>>>>>>>>>>> version for that. This means that you would have to update the >>>>>>>>>>>> Flink >>>>>>>>>>>> version in your Zeppelin branch to 1.1-SNAPSHOT to make it work. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Till >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Apr 15, 2016 at 1:03 PM, Andrea Sella < >>>>>>>>>>>> andrea.se...@radicalbit.io> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Till, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks to follow me with this issue :) >>>>>>>>>>>>> >>>>>>>>>>>>> Here the logs >>>>>>>>>>>>> <https://gist.github.com/alkagin/663fae1fc2993f0acd3ba66697f14093>, >>>>>>>>>>>>> are there enough? >>>>>>>>>>>>> >>>>>>>>>>>>> As I wrote in the previous mail, in the logs you can see also >>>>>>>>>>>>> the Configuration. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Andrea >>>>>>>>>>>>> >>>>>>>>>>>>> 2016-04-15 10:07 GMT+02:00 Till Rohrmann <trohrm...@apache.org >>>>>>>>>>>>> >: >>>>>>>>>>>>> >>>>>>>>>>>>>> In HA mode, the host and port information you provide to the >>>>>>>>>>>>>> Shell should >>>>>>>>>>>>>> be simply ignored. So you don't have to retrieve them from the >>>>>>>>>>>>>> .yarn-properties file. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Could you maybe run the FlinkInterpreter with debug log level >>>>>>>>>>>>>> and share the >>>>>>>>>>>>>> logs with me? You can also do that privately, if you don't >>>>>>>>>>>>>> want to share >>>>>>>>>>>>>> them on the mailing list. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I haven't tried it myself, but I thought that the Shell also >>>>>>>>>>>>>> works with an >>>>>>>>>>>>>> HA cluster, because it uses the same mechanism as the CLI, >>>>>>>>>>>>>> for example. >>>>>>>>>>>>>> I'll try it out later this day. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Till >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Apr 15, 2016 at 12:22 AM, Andrea Sella < >>>>>>>>>>>>>> andrea.se...@radicalbit.io> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> > Hi Till, >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > The cluster has started in HA. >>>>>>>>>>>>>> > I already patched Flink interpreter to allow passing the >>>>>>>>>>>>>> Configuration to >>>>>>>>>>>>>> > FlinkILoop. Neverthless I have to pass host and port to >>>>>>>>>>>>>> FlinkILoop, there >>>>>>>>>>>>>> > are required from FlinkILoop constructor and I retrieve >>>>>>>>>>>>>> them from >>>>>>>>>>>>>> > .yarn-properties file. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > I logged Flink Configuration: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > INFO [2016-04-14 17:52:58,141] ({pool-2-thread-2} >>>>>>>>>>>>>> > FlinkInterpreter.java[open]:96) - Flink Configuration: { >>>>>>>>>>>>>> > recovery.mode=zookeeper, host=yarn, >>>>>>>>>>>>>> > yarn-properties=/tmp/.yarn-properties-flink, >>>>>>>>>>>>>> > >>>>>>>>>>>>>> recovery.zookeeper.quorum=slave01:2181,slave02:2181,master:2181, >>>>>>>>>>>>>> > recovery.zookeeper.path.root=/flink/recovery} >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > and I attach some logs: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Error displayed in paragraph of Zeppelin >>>>>>>>>>>>>> > < >>>>>>>>>>>>>> https://gist.github.com/alkagin/612d736da8af9ee111e766b230559bb9 >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > JobManager log >>>>>>>>>>>>>> > < >>>>>>>>>>>>>> https://gist.github.com/alkagin/0a0b2670ce77f7d9c0807b1e4ef7239a >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Interpreter/FlinkILoop log >>>>>>>>>>>>>> > < >>>>>>>>>>>>>> https://gist.github.com/alkagin/23e4cec15904448dd2b400a6a37f7fa7 >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > I was looking Flink shell and it works similar to the >>>>>>>>>>>>>> interpreter, do it >>>>>>>>>>>>>> > works with HA cluster? >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Thank you, >>>>>>>>>>>>>> > Andrea >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > 2016-04-14 16:09 GMT+02:00 Till Rohrmann < >>>>>>>>>>>>>> trohrm...@apache.org>: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > > Hi Andrea, >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > have you started the Flink Yarn cluster in HA mode? Then >>>>>>>>>>>>>> the job manager >>>>>>>>>>>>>> > > address is stored in ZooKeeper and you have to tell your >>>>>>>>>>>>>> FlinkILoop that >>>>>>>>>>>>>> > it >>>>>>>>>>>>>> > > should retrieve the JobManager address from there. In >>>>>>>>>>>>>> order to do that >>>>>>>>>>>>>> > you >>>>>>>>>>>>>> > > have to set conf.setString(ConfigConstants.RECOVERY_MODE, >>>>>>>>>>>>>> > > "zookeeper"), >>>>>>>>>>>>>> conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, >>>>>>>>>>>>>> > > "address of your zookeeper cluster") and >>>>>>>>>>>>>> > > conf.setString(ConfigConstants.ZOOKEEPER_DIR_KEY, >>>>>>>>>>>>>> > > "flink dir you've set") where conf is the flink >>>>>>>>>>>>>> configuration object. The >>>>>>>>>>>>>> > > values for the different configuration values must match >>>>>>>>>>>>>> the values >>>>>>>>>>>>>> > > specified in the flink-conf.yaml file. You then give the >>>>>>>>>>>>>> FlinkILoop the >>>>>>>>>>>>>> > > conf object. >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > I’m not sure whether you can specify a custom flink >>>>>>>>>>>>>> configuration in >>>>>>>>>>>>>> > > Zeppelin. I think you can only specify a host and port. >>>>>>>>>>>>>> So either you >>>>>>>>>>>>>> > start >>>>>>>>>>>>>> > > you Flink cluster in non-HA mode or you have to patch >>>>>>>>>>>>>> Zeppelin. >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > Cheers, >>>>>>>>>>>>>> > > Till >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > On Tue, Apr 12, 2016 at 5:12 PM, Andrea Sella < >>>>>>>>>>>>>> > andrea.se...@radicalbit.io> >>>>>>>>>>>>>> > > wrote: >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > > Hi, >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > I am working to allow Zeppelin's flink interpreter to >>>>>>>>>>>>>> connect an >>>>>>>>>>>>>> > existing >>>>>>>>>>>>>> > > > yarn cluster. Yarn cluster has started via yarn-session >>>>>>>>>>>>>> and flink's >>>>>>>>>>>>>> > > version >>>>>>>>>>>>>> > > > is 1.0.0. >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > My approach is to read host and port from >>>>>>>>>>>>>> .yarn-properties and pass >>>>>>>>>>>>>> > them >>>>>>>>>>>>>> > > to >>>>>>>>>>>>>> > > > IFlinkLoop. >>>>>>>>>>>>>> > > > Now I am facing an issue with Session ID when I submit >>>>>>>>>>>>>> a paragraph to >>>>>>>>>>>>>> > > yarn >>>>>>>>>>>>>> > > > cluster. >>>>>>>>>>>>>> > > > The yarn cluster throws a warning similar to: >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > 2016-04-12 10:14:32,666 WARN >>>>>>>>>>>>>> org.apache.flink.yarn.YarnJobManager >>>>>>>>>>>>>> > > > - Discard message >>>>>>>>>>>>>> > > > LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES)) >>>>>>>>>>>>>> > > > because the expected leader session ID >>>>>>>>>>>>>> > > > Some(afc85978-f765-488b-acbb-79c2d7cb89e0) did not >>>>>>>>>>>>>> equal the received >>>>>>>>>>>>>> > > > leader session ID None. >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > My Zeppelin's paragraph throws a >>>>>>>>>>>>>> > > JobClientActorSubmissionTimeoutException, >>>>>>>>>>>>>> > > > maybe is it due to the missing sessionId? Do I need to >>>>>>>>>>>>>> pass extra >>>>>>>>>>>>>> > params >>>>>>>>>>>>>> > > to >>>>>>>>>>>>>> > > > connect correctly to the yarn cluster or host and port >>>>>>>>>>>>>> are enough? >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > Thanks in advance, >>>>>>>>>>>>>> > > > Andrea >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >