You're right..I removed my flink dir and I re-extracted it and now it works. Unfortunately I didn't keep the old version to understand what were the difference but the error was probably caused by the fact that I had a previous version of the WordCount.jar (without the listener) in the flink lib dir.. (in another dev session I was experimenting in running the job having the user jar in the lib dir). Sorry for the confusion. Just one last question: is the listener executed on the client or on the job server? This is not entirely clear to me..
Best, Flavio On Thu, Nov 19, 2020 at 1:53 PM Andrey Zagrebin <azagre...@apache.org> wrote: > > I also tried 1.11.0 and 1.11.2, both work for me. > > On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek <aljos...@apache.org> wrote: >> >> Hmm, there was this issue: >> https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed >> in your version. >> >> On 19.11.20 12:58, Flavio Pompermaier wrote: >> > Which version are you using? >> > I used the exact same commands on Flink 1.11.0 and I didn't get the job >> > listener output.. >> > >> > Il gio 19 nov 2020, 12:53 Andrey Zagrebin <azagre...@apache.org> ha >> > scritto: >> > >> >> Hi Flavio and Aljoscha, >> >> >> >> Sorry for the late heads up. I could not actually reproduce the reported >> >> problem with 'flink run' and local standalone cluster on master. >> >> I get the expected output with the suggested modification of WordCount >> >> program: >> >> >> >> $ bin/start-cluster.sh >> >> >> >> $ rm -rf out; bin/flink run >> >> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output >> >> flink/build-target/out >> >> >> >> Executing WordCount example with default input data set. >> >> Use --input to specify file input. >> >> **************** SUBMITTED >> >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07 >> >> Program execution finished >> >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished. >> >> Job Runtime: 139 ms >> >> >> >> **************** EXECUTED >> >> >> >> Best, >> >> Andrey >> >> >> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <aljos...@apache.org> >> >> wrote: >> >> >> >>> JobListener.onJobExecuted() is only invoked in >> >>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none >> >>> of these is still in the call chain with that setup then the listener >> >>> will not be invoked. >> >>> >> >>> Also, this would only happen on the client, not on the broker (in your >> >>> case) or the server (JobManager). >> >>> >> >>> Does that help to debug the problem? >> >>> >> >>> Aljoscha >> >>> >> >>> On 19.11.20 09:49, Flavio Pompermaier wrote: >> >>>> I have a spring boot job server that act as a broker towards our >> >>>> application and a Flink session cluster. To submit a job I use the >> >>>> FlinkRestClient (that is also the one used in the CLI client when I use >> >>> the >> >>>> run action it if I'm not wrong). However both methods don't trigger the >> >>> job >> >>>> listener. >> >>>> >> >>>> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <aljos...@apache.org> ha >> >>> scritto: >> >>>> >> >>>>> @Flavio, when you're saying you're using the RestClusterClient, you are >> >>>>> not actually using that manually, right? You're just submitting your >> >>> job >> >>>>> via "bin/flink run ...", right? >> >>>>> >> >>>>> What's the exact invocation of "bin/flink run" that you're using? >> >>>>> >> >>>>> On 19.11.20 09:29, Andrey Zagrebin wrote: >> >>>>>> Hi Flavio, >> >>>>>> >> >>>>>> I think I can reproduce what you are reporting (assuming you also pass >> >>>>>> '--output' to 'flink run'). >> >>>>>> I am not sure why it behaves like this. I would suggest filing a Jira >> >>>>>> ticket for this. >> >>>>>> >> >>>>>> Best, >> >>>>>> Andrey >> >>>>>> >> >>>>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier < >> >>> pomperma...@okkam.it >> >>>>>> >> >>>>>> wrote: >> >>>>>> >> >>>>>>> is this a bug or is it a documentation problem...? >> >>>>>>> >> >>>>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <pomperma...@okkam.it> >> >>> ha >> >>>>>>> scritto: >> >>>>>>> >> >>>>>>>> I've also verified that the problem persist also using a modified >> >>>>> version >> >>>>>>>> of the WordCount class. >> >>>>>>>> If you add the code pasted at the end of this email at the end of >> >>> its >> >>>>>>>> main method you can verify that the listener is called if you run >> >>> the >> >>>>>>>> program from the IDE, but it's not called if you submit the job >> >>> using >> >>>>> the >> >>>>>>>> CLI client using the command >> >>>>>>>> >> >>>>>>>> - bin/flink run >> >>>>>>>> >> >>>>> >> >>> >> >>> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar >> >>>>>>>> >> >>>>>>>> Maybe this is an expected result but I didn't find any >> >>> documentation of >> >>>>>>>> this behaviour (neither in the Javadoc or in the flink web site, >> >>> where >> >>>>> I >> >>>>>>>> can't find any documentation about JobListener at all). >> >>>>>>>> >> >>>>>>>> [Code to add to main()] >> >>>>>>>> // emit result >> >>>>>>>> if (params.has("output")) { >> >>>>>>>> counts.writeAsCsv(params.get("output"), "\n", " "); >> >>>>>>>> // execute program >> >>>>>>>> env.registerJobListener(new JobListener() { >> >>>>>>>> >> >>>>>>>> @Override >> >>>>>>>> public void onJobSubmitted(JobClient arg0, Throwable >> >>> arg1) { >> >>>>>>>> System.out.println("**************** SUBMITTED"); >> >>>>>>>> } >> >>>>>>>> >> >>>>>>>> @Override >> >>>>>>>> public void onJobExecuted(JobExecutionResult arg0, >> >>> Throwable >> >>>>>>>> arg1) { >> >>>>>>>> System.out.println("**************** EXECUTED"); >> >>>>>>>> } >> >>>>>>>> }); >> >>>>>>>> env.execute("WordCount Example"); >> >>>>>>>> } else { >> >>>>>>>> System.out.println("Printing result to stdout. Use --output >> >>> to >> >>>>>>>> specify output path."); >> >>>>>>>> counts.print(); >> >>>>>>>> } >> >>>>>>>> >> >>>>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier < >> >>>>> pomperma...@okkam.it> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>>> see inline >> >>>>>>>>> >> >>>>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <matth...@ververica.com> >> >>> ha >> >>>>>>>>> scritto: >> >>>>>>>>> >> >>>>>>>>>> Hi Flavio, >> >>>>>>>>>> thanks for sharing this with the Flink community. Could you answer >> >>>>> the >> >>>>>>>>>> following questions, please: >> >>>>>>>>>> - What's the code of your Job's main method? >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> it's actually very simple...the main class creates a batch >> >>> execution >> >>>>> env >> >>>>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a >> >>> job >> >>>>>>>>> listener to the env and I do some stuff before calling >> >>> env.execute(). >> >>>>>>>>> The listener is executed correctly but if I use the >> >>> RestClusterClient >> >>>>> to >> >>>>>>>>> sibmit the jobGraph exyracted from that main contained in a jar, >> >>> the >> >>>>>>>>> program is executed as usual but the job listener is not called. >> >>>>>>>>> >> >>>>>>>>> - What cluster backend and application do you use to execute the >> >>> job? >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> I use a standalone session cluster for the moment >> >>>>>>>>> >> >>>>>>>>> - Is there anything suspicious you can find in the logs that might >> >>> be >> >>>>>>>>>> related? >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> no unfortunately.. >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>>> Best, >> >>>>>>>>>> Matthias >> >>>>>>>>>> >> >>>>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier < >> >>>>>>>>>> pomperma...@okkam.it> wrote: >> >>>>>>>>>> >> >>>>>>>>>>> Actually what I'm experiencing is that the JobListener is >> >>> executed >> >>>>>>>>>>> successfully if I run my main class from the IDE, while the job >> >>>>> listener is >> >>>>>>>>>>> not fired at all if I submit the JobGraph of the application to a >> >>>>> cluster >> >>>>>>>>>>> using the RestClusterClient.. >> >>>>>>>>>>> Am I doing something wrong? >> >>>>>>>>>>> >> >>>>>>>>>>> My main class ends with the env.execute() and i do >> >>>>>>>>>>> env.registerJobListener() when I create the Exceution env >> >>>>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment(). >> >>>>>>>>>>> >> >>>>>>>>>>> Thanks in advance for any help, >> >>>>>>>>>>> Flavio >> >>>>>>>>>>> >> >>>>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier < >> >>>>>>>>>>> pomperma...@okkam.it> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>>> Hello everybody, >> >>>>>>>>>>>> I'm trying to use the JobListener to track when a job finishes >> >>>>> (with >> >>>>>>>>>>>> Flink 1.11.0). >> >>>>>>>>>>>> It works great but I have the problem that logs inside >> >>>>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal? >> >>>>>>>>>>>> >> >>>>>>>>>>>> Best, >> >>>>>>>>>>>> Flavio >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>> >> >>>>> >> >>>>> >> >>>> >> >>> >> >>> >> > >>