Hi Till, It works as expected, thanks!
Andrea 2016-04-19 15:25 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: > Hi Andrea, > > I think your problem should be fixed with the PRs [1,2]. I've tested it > locally on my yarn cluster and it worked. > > [1] https://github.com/apache/flink/pull/1904 > [2] https://github.com/apache/flink/pull/1914 > > Cheers, > Till > > On Tue, Apr 19, 2016 at 2:16 PM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> I think this is another issue you’ve detected. I already spotted some >> suspicious code in the yarn deployment section. If I’m not mistaken, then >> flink-conf.yaml is read too late and is, thus, not respected. I’ll >> verify it and if valid, then I’ll open another issue and fix it. >> >> Thanks for your patience and thorough reporting. It helps a lot :-) >> >> Cheers, >> Till >> >> >> On Tue, Apr 19, 2016 at 2:12 PM, Andrea Sella <andrea.se...@radicalbit.io >> > wrote: >> >>> No, I tried it via scala-shell as you can see the attachment. >>> >>> Regards, >>> Andrea >>> >>> 2016-04-19 14:08 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >>> >>>> Hi Andrea, >>>> >>>> thanks for testing it. How did you submit the job this time? Via >>>> Zeppelin? >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Apr 19, 2016 at 12:51 PM, Andrea Sella < >>>> andrea.se...@radicalbit.io> wrote: >>>> >>>>> Hi Till, >>>>> >>>>> I've used your branch fixScalaShell to test the scala-shell with our >>>>> HA cluster, it doesn't work. Same error as before >>>>> >>>>> 2016-04-19 06:40:35,030 WARN org.apache.flink.yarn.YarnJobManager >>>>> - Discard message >>>>> LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: >>>>> aa5b034e10a850d863642a24aab75d9c),EXECUTION_RESULT_AND_STATE_CHANGES)) >>>>> because the expected leader session ID >>>>> Some(bc706707-2bab-4b82-b7a7-1426dce696a7) did not equal the received >>>>> leader session ID None. >>>>> >>>>> If I submit a simple job, it works. I think it is not a problem of our >>>>> environment. >>>>> >>>>> Cheers, >>>>> Andrea >>>>> >>>>> 2016-04-18 18:41 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >>>>> >>>>>> Cool, that helps a lot :-) >>>>>> >>>>>> On Mon, Apr 18, 2016 at 6:06 PM, Andrea Sella < >>>>>> andrea.se...@radicalbit.io> wrote: >>>>>> >>>>>>> Hi Till, >>>>>>> >>>>>>> Don't worry, I am going to test the PR in our HA environment. >>>>>>> >>>>>>> Cheers, >>>>>>> Andrea >>>>>>> >>>>>>> >>>>>>> 2016-04-18 17:46 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >>>>>>> >>>>>>>> Hi Andrea, >>>>>>>> >>>>>>>> sorry I've seen your mail too late. I already fixed the problem and >>>>>>>> opened a PR [1] for it. I hope you haven't invested too much time for >>>>>>>> it, >>>>>>>> yet. >>>>>>>> >>>>>>>> [1] https://github.com/apache/flink/pull/1904 >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> >>>>>>>> On Mon, Apr 18, 2016 at 11:19 AM, Andrea Sella < >>>>>>>> andrea.se...@radicalbit.io> wrote: >>>>>>>> >>>>>>>>> Hi Till, >>>>>>>>> Thanks for the support, I will take the issue and starting to work >>>>>>>>> on it asap. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Andrea >>>>>>>>> >>>>>>>>> 2016-04-18 10:32 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >>>>>>>>> >>>>>>>>>> Hi Andrea, >>>>>>>>>> >>>>>>>>>> I think the problem is simply that it has not been correctly >>>>>>>>>> implemented. I just checked and I think the user configuration is >>>>>>>>>> not given >>>>>>>>>> to the PlanExecutor which is internally created. I’ve opened an >>>>>>>>>> issue for that [1]. >>>>>>>>>> >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-3774 >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Till >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Apr 15, 2016 at 4:58 PM, Andrea Sella < >>>>>>>>>> andrea.se...@radicalbit.io> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Till, >>>>>>>>>>> >>>>>>>>>>> I've tried the Scala-Shell with our HA cluster, no luck again. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Andrea >>>>>>>>>>> >>>>>>>>>>> 2016-04-15 14:43 GMT+02:00 Andrea Sella < >>>>>>>>>>> andrea.se...@radicalbit.io>: >>>>>>>>>>> >>>>>>>>>>>> Hi Till, >>>>>>>>>>>> >>>>>>>>>>>> I am using a branched version of 1.0.1 where I cherry-picked >>>>>>>>>>>> FLINK-2935 >>>>>>>>>>>> <https://github.com/radicalbit/flink/commit/dfbbb9e48c98b486baf279c396d1bf7de31c1f8c> >>>>>>>>>>>> to >>>>>>>>>>>> use FlinkILoop with Configuration. My Flink interpreter is here >>>>>>>>>>>> <https://github.com/radicalbit/incubator-zeppelin/blob/flink-yarn-interpreter/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java>, >>>>>>>>>>>> I've started tweaking just two days ago and as I can see there is a >>>>>>>>>>>> Zeppelin issue >>>>>>>>>>>> <https://issues.apache.org/jira/browse/ZEPPELIN-664> to >>>>>>>>>>>> provide FlinkInterpeter working with Yarn and I need it too. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Andrea >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 2016-04-15 14:20 GMT+02:00 Till Rohrmann <trohrm...@apache.org> >>>>>>>>>>>> : >>>>>>>>>>>> >>>>>>>>>>>>> Hi Andrea, >>>>>>>>>>>>> >>>>>>>>>>>>> which version of Flink are you using with Zeppelin? How do you >>>>>>>>>>>>> pass the Flink configuration to the FlinkILoop? Could you maybe >>>>>>>>>>>>> show me >>>>>>>>>>>>> your version of Zeppelin (code). >>>>>>>>>>>>> >>>>>>>>>>>>> According to the log, the ScalaShellRemoteEnvironment didn't >>>>>>>>>>>>> get the Flink configuration with the HA settings. Therefore, it >>>>>>>>>>>>> still tries >>>>>>>>>>>>> to connect to the jobmanager specified by the host and port >>>>>>>>>>>>> values. The >>>>>>>>>>>>> functionality to pass in a Flink configuration object to >>>>>>>>>>>>> FlinkILoop has >>>>>>>>>>>>> only been merged recently. You might have to switch to the >>>>>>>>>>>>> 1.1-SNAPSHOT >>>>>>>>>>>>> version for that. This means that you would have to update the >>>>>>>>>>>>> Flink >>>>>>>>>>>>> version in your Zeppelin branch to 1.1-SNAPSHOT to make it work. >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> Till >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Apr 15, 2016 at 1:03 PM, Andrea Sella < >>>>>>>>>>>>> andrea.se...@radicalbit.io> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Till, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks to follow me with this issue :) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Here the logs >>>>>>>>>>>>>> <https://gist.github.com/alkagin/663fae1fc2993f0acd3ba66697f14093>, >>>>>>>>>>>>>> are there enough? >>>>>>>>>>>>>> >>>>>>>>>>>>>> As I wrote in the previous mail, in the logs you can see also >>>>>>>>>>>>>> the Configuration. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Andrea >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2016-04-15 10:07 GMT+02:00 Till Rohrmann < >>>>>>>>>>>>>> trohrm...@apache.org>: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> In HA mode, the host and port information you provide to the >>>>>>>>>>>>>>> Shell should >>>>>>>>>>>>>>> be simply ignored. So you don't have to retrieve them from >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> .yarn-properties file. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Could you maybe run the FlinkInterpreter with debug log >>>>>>>>>>>>>>> level and share the >>>>>>>>>>>>>>> logs with me? You can also do that privately, if you don't >>>>>>>>>>>>>>> want to share >>>>>>>>>>>>>>> them on the mailing list. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I haven't tried it myself, but I thought that the Shell also >>>>>>>>>>>>>>> works with an >>>>>>>>>>>>>>> HA cluster, because it uses the same mechanism as the CLI, >>>>>>>>>>>>>>> for example. >>>>>>>>>>>>>>> I'll try it out later this day. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> Till >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Apr 15, 2016 at 12:22 AM, Andrea Sella < >>>>>>>>>>>>>>> andrea.se...@radicalbit.io> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > Hi Till, >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > The cluster has started in HA. >>>>>>>>>>>>>>> > I already patched Flink interpreter to allow passing the >>>>>>>>>>>>>>> Configuration to >>>>>>>>>>>>>>> > FlinkILoop. Neverthless I have to pass host and port to >>>>>>>>>>>>>>> FlinkILoop, there >>>>>>>>>>>>>>> > are required from FlinkILoop constructor and I retrieve >>>>>>>>>>>>>>> them from >>>>>>>>>>>>>>> > .yarn-properties file. >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > I logged Flink Configuration: >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > INFO [2016-04-14 17:52:58,141] ({pool-2-thread-2} >>>>>>>>>>>>>>> > FlinkInterpreter.java[open]:96) - Flink Configuration: { >>>>>>>>>>>>>>> > recovery.mode=zookeeper, host=yarn, >>>>>>>>>>>>>>> > yarn-properties=/tmp/.yarn-properties-flink, >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> recovery.zookeeper.quorum=slave01:2181,slave02:2181,master:2181, >>>>>>>>>>>>>>> > recovery.zookeeper.path.root=/flink/recovery} >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > and I attach some logs: >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Error displayed in paragraph of Zeppelin >>>>>>>>>>>>>>> > < >>>>>>>>>>>>>>> https://gist.github.com/alkagin/612d736da8af9ee111e766b230559bb9 >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > JobManager log >>>>>>>>>>>>>>> > < >>>>>>>>>>>>>>> https://gist.github.com/alkagin/0a0b2670ce77f7d9c0807b1e4ef7239a >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Interpreter/FlinkILoop log >>>>>>>>>>>>>>> > < >>>>>>>>>>>>>>> https://gist.github.com/alkagin/23e4cec15904448dd2b400a6a37f7fa7 >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > I was looking Flink shell and it works similar to the >>>>>>>>>>>>>>> interpreter, do it >>>>>>>>>>>>>>> > works with HA cluster? >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Thank you, >>>>>>>>>>>>>>> > Andrea >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > 2016-04-14 16:09 GMT+02:00 Till Rohrmann < >>>>>>>>>>>>>>> trohrm...@apache.org>: >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > > Hi Andrea, >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > have you started the Flink Yarn cluster in HA mode? Then >>>>>>>>>>>>>>> the job manager >>>>>>>>>>>>>>> > > address is stored in ZooKeeper and you have to tell your >>>>>>>>>>>>>>> FlinkILoop that >>>>>>>>>>>>>>> > it >>>>>>>>>>>>>>> > > should retrieve the JobManager address from there. In >>>>>>>>>>>>>>> order to do that >>>>>>>>>>>>>>> > you >>>>>>>>>>>>>>> > > have to set conf.setString(ConfigConstants.RECOVERY_MODE, >>>>>>>>>>>>>>> > > "zookeeper"), >>>>>>>>>>>>>>> conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, >>>>>>>>>>>>>>> > > "address of your zookeeper cluster") and >>>>>>>>>>>>>>> > > conf.setString(ConfigConstants.ZOOKEEPER_DIR_KEY, >>>>>>>>>>>>>>> > > "flink dir you've set") where conf is the flink >>>>>>>>>>>>>>> configuration object. The >>>>>>>>>>>>>>> > > values for the different configuration values must match >>>>>>>>>>>>>>> the values >>>>>>>>>>>>>>> > > specified in the flink-conf.yaml file. You then give the >>>>>>>>>>>>>>> FlinkILoop the >>>>>>>>>>>>>>> > > conf object. >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > I’m not sure whether you can specify a custom flink >>>>>>>>>>>>>>> configuration in >>>>>>>>>>>>>>> > > Zeppelin. I think you can only specify a host and port. >>>>>>>>>>>>>>> So either you >>>>>>>>>>>>>>> > start >>>>>>>>>>>>>>> > > you Flink cluster in non-HA mode or you have to patch >>>>>>>>>>>>>>> Zeppelin. >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > Cheers, >>>>>>>>>>>>>>> > > Till >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > On Tue, Apr 12, 2016 at 5:12 PM, Andrea Sella < >>>>>>>>>>>>>>> > andrea.se...@radicalbit.io> >>>>>>>>>>>>>>> > > wrote: >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > > Hi, >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > I am working to allow Zeppelin's flink interpreter to >>>>>>>>>>>>>>> connect an >>>>>>>>>>>>>>> > existing >>>>>>>>>>>>>>> > > > yarn cluster. Yarn cluster has started via >>>>>>>>>>>>>>> yarn-session and flink's >>>>>>>>>>>>>>> > > version >>>>>>>>>>>>>>> > > > is 1.0.0. >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > My approach is to read host and port from >>>>>>>>>>>>>>> .yarn-properties and pass >>>>>>>>>>>>>>> > them >>>>>>>>>>>>>>> > > to >>>>>>>>>>>>>>> > > > IFlinkLoop. >>>>>>>>>>>>>>> > > > Now I am facing an issue with Session ID when I submit >>>>>>>>>>>>>>> a paragraph to >>>>>>>>>>>>>>> > > yarn >>>>>>>>>>>>>>> > > > cluster. >>>>>>>>>>>>>>> > > > The yarn cluster throws a warning similar to: >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > 2016-04-12 10:14:32,666 WARN >>>>>>>>>>>>>>> org.apache.flink.yarn.YarnJobManager >>>>>>>>>>>>>>> > > > - Discard message >>>>>>>>>>>>>>> > > > LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES)) >>>>>>>>>>>>>>> > > > because the expected leader session ID >>>>>>>>>>>>>>> > > > Some(afc85978-f765-488b-acbb-79c2d7cb89e0) did not >>>>>>>>>>>>>>> equal the received >>>>>>>>>>>>>>> > > > leader session ID None. >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > My Zeppelin's paragraph throws a >>>>>>>>>>>>>>> > > JobClientActorSubmissionTimeoutException, >>>>>>>>>>>>>>> > > > maybe is it due to the missing sessionId? Do I need to >>>>>>>>>>>>>>> pass extra >>>>>>>>>>>>>>> > params >>>>>>>>>>>>>>> > > to >>>>>>>>>>>>>>> > > > connect correctly to the yarn cluster or host and port >>>>>>>>>>>>>>> are enough? >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > Thanks in advance, >>>>>>>>>>>>>>> > > > Andrea >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >