Which version are you using? I used the exact same commands on Flink 1.11.0 and I didn't get the job listener output..
Il gio 19 nov 2020, 12:53 Andrey Zagrebin <azagre...@apache.org> ha scritto: > Hi Flavio and Aljoscha, > > Sorry for the late heads up. I could not actually reproduce the reported > problem with 'flink run' and local standalone cluster on master. > I get the expected output with the suggested modification of WordCount > program: > > $ bin/start-cluster.sh > > $ rm -rf out; bin/flink run > flink/flink-examples/flink-examples-batch/target/WordCount.jar --output > flink/build-target/out > > Executing WordCount example with default input data set. > Use --input to specify file input. > **************** SUBMITTED > Job has been submitted with JobID c454a894d0524ccb69943b95838eea07 > Program execution finished > Job with JobID c454a894d0524ccb69943b95838eea07 has finished. > Job Runtime: 139 ms > > **************** EXECUTED > > Best, > Andrey > > On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <aljos...@apache.org> > wrote: > >> JobListener.onJobExecuted() is only invoked in >> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none >> of these is still in the call chain with that setup then the listener >> will not be invoked. >> >> Also, this would only happen on the client, not on the broker (in your >> case) or the server (JobManager). >> >> Does that help to debug the problem? >> >> Aljoscha >> >> On 19.11.20 09:49, Flavio Pompermaier wrote: >> > I have a spring boot job server that act as a broker towards our >> > application and a Flink session cluster. To submit a job I use the >> > FlinkRestClient (that is also the one used in the CLI client when I use >> the >> > run action it if I'm not wrong). However both methods don't trigger the >> job >> > listener. >> > >> > Il gio 19 nov 2020, 09:39 Aljoscha Krettek <aljos...@apache.org> ha >> scritto: >> > >> >> @Flavio, when you're saying you're using the RestClusterClient, you are >> >> not actually using that manually, right? You're just submitting your >> job >> >> via "bin/flink run ...", right? >> >> >> >> What's the exact invocation of "bin/flink run" that you're using? >> >> >> >> On 19.11.20 09:29, Andrey Zagrebin wrote: >> >>> Hi Flavio, >> >>> >> >>> I think I can reproduce what you are reporting (assuming you also pass >> >>> '--output' to 'flink run'). >> >>> I am not sure why it behaves like this. I would suggest filing a Jira >> >>> ticket for this. >> >>> >> >>> Best, >> >>> Andrey >> >>> >> >>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier < >> pomperma...@okkam.it >> >>> >> >>> wrote: >> >>> >> >>>> is this a bug or is it a documentation problem...? >> >>>> >> >>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <pomperma...@okkam.it> >> ha >> >>>> scritto: >> >>>> >> >>>>> I've also verified that the problem persist also using a modified >> >> version >> >>>>> of the WordCount class. >> >>>>> If you add the code pasted at the end of this email at the end of >> its >> >>>>> main method you can verify that the listener is called if you run >> the >> >>>>> program from the IDE, but it's not called if you submit the job >> using >> >> the >> >>>>> CLI client using the command >> >>>>> >> >>>>> - bin/flink run >> >>>>> >> >> >> >> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar >> >>>>> >> >>>>> Maybe this is an expected result but I didn't find any >> documentation of >> >>>>> this behaviour (neither in the Javadoc or in the flink web site, >> where >> >> I >> >>>>> can't find any documentation about JobListener at all). >> >>>>> >> >>>>> [Code to add to main()] >> >>>>> // emit result >> >>>>> if (params.has("output")) { >> >>>>> counts.writeAsCsv(params.get("output"), "\n", " "); >> >>>>> // execute program >> >>>>> env.registerJobListener(new JobListener() { >> >>>>> >> >>>>> @Override >> >>>>> public void onJobSubmitted(JobClient arg0, Throwable >> arg1) { >> >>>>> System.out.println("**************** SUBMITTED"); >> >>>>> } >> >>>>> >> >>>>> @Override >> >>>>> public void onJobExecuted(JobExecutionResult arg0, >> Throwable >> >>>>> arg1) { >> >>>>> System.out.println("**************** EXECUTED"); >> >>>>> } >> >>>>> }); >> >>>>> env.execute("WordCount Example"); >> >>>>> } else { >> >>>>> System.out.println("Printing result to stdout. Use --output >> to >> >>>>> specify output path."); >> >>>>> counts.print(); >> >>>>> } >> >>>>> >> >>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier < >> >> pomperma...@okkam.it> >> >>>>> wrote: >> >>>>> >> >>>>>> see inline >> >>>>>> >> >>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <matth...@ververica.com> >> ha >> >>>>>> scritto: >> >>>>>> >> >>>>>>> Hi Flavio, >> >>>>>>> thanks for sharing this with the Flink community. Could you answer >> >> the >> >>>>>>> following questions, please: >> >>>>>>> - What's the code of your Job's main method? >> >>>>>>> >> >>>>>> >> >>>>>> it's actually very simple...the main class creates a batch >> execution >> >> env >> >>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a >> job >> >>>>>> listener to the env and I do some stuff before calling >> env.execute(). >> >>>>>> The listener is executed correctly but if I use the >> RestClusterClient >> >> to >> >>>>>> sibmit the jobGraph exyracted from that main contained in a jar, >> the >> >>>>>> program is executed as usual but the job listener is not called. >> >>>>>> >> >>>>>> - What cluster backend and application do you use to execute the >> job? >> >>>>>>> >> >>>>>> >> >>>>>> I use a standalone session cluster for the moment >> >>>>>> >> >>>>>> - Is there anything suspicious you can find in the logs that might >> be >> >>>>>>> related? >> >>>>>>> >> >>>>>> >> >>>>>> no unfortunately.. >> >>>>>> >> >>>>>> >> >>>>>>> Best, >> >>>>>>> Matthias >> >>>>>>> >> >>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier < >> >>>>>>> pomperma...@okkam.it> wrote: >> >>>>>>> >> >>>>>>>> Actually what I'm experiencing is that the JobListener is >> executed >> >>>>>>>> successfully if I run my main class from the IDE, while the job >> >> listener is >> >>>>>>>> not fired at all if I submit the JobGraph of the application to a >> >> cluster >> >>>>>>>> using the RestClusterClient.. >> >>>>>>>> Am I doing something wrong? >> >>>>>>>> >> >>>>>>>> My main class ends with the env.execute() and i do >> >>>>>>>> env.registerJobListener() when I create the Exceution env >> >>>>>>>> via ExecutionEnvironment.getExecutionEnvironment(). >> >>>>>>>> >> >>>>>>>> Thanks in advance for any help, >> >>>>>>>> Flavio >> >>>>>>>> >> >>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier < >> >>>>>>>> pomperma...@okkam.it> wrote: >> >>>>>>>> >> >>>>>>>>> Hello everybody, >> >>>>>>>>> I'm trying to use the JobListener to track when a job finishes >> >> (with >> >>>>>>>>> Flink 1.11.0). >> >>>>>>>>> It works great but I have the problem that logs inside >> >>>>>>>>> the onJobExecuted are not logged anywhere..is it normal? >> >>>>>>>>> >> >>>>>>>>> Best, >> >>>>>>>>> Flavio >> >>>>>>>>> >> >>>>>>>> >> >>> >> >> >> >> >> > >> >>