I also tried 1.11.0 and 1.11.2, both work for me. On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek <aljos...@apache.org> wrote:
> Hmm, there was this issue: > https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed > in your version. > > On 19.11.20 12:58, Flavio Pompermaier wrote: > > Which version are you using? > > I used the exact same commands on Flink 1.11.0 and I didn't get the job > > listener output.. > > > > Il gio 19 nov 2020, 12:53 Andrey Zagrebin <azagre...@apache.org> ha > scritto: > > > >> Hi Flavio and Aljoscha, > >> > >> Sorry for the late heads up. I could not actually reproduce the reported > >> problem with 'flink run' and local standalone cluster on master. > >> I get the expected output with the suggested modification of WordCount > >> program: > >> > >> $ bin/start-cluster.sh > >> > >> $ rm -rf out; bin/flink run > >> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output > >> flink/build-target/out > >> > >> Executing WordCount example with default input data set. > >> Use --input to specify file input. > >> **************** SUBMITTED > >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07 > >> Program execution finished > >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished. > >> Job Runtime: 139 ms > >> > >> **************** EXECUTED > >> > >> Best, > >> Andrey > >> > >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <aljos...@apache.org> > >> wrote: > >> > >>> JobListener.onJobExecuted() is only invoked in > >>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If > none > >>> of these is still in the call chain with that setup then the listener > >>> will not be invoked. > >>> > >>> Also, this would only happen on the client, not on the broker (in your > >>> case) or the server (JobManager). > >>> > >>> Does that help to debug the problem? > >>> > >>> Aljoscha > >>> > >>> On 19.11.20 09:49, Flavio Pompermaier wrote: > >>>> I have a spring boot job server that act as a broker towards our > >>>> application and a Flink session cluster. To submit a job I use the > >>>> FlinkRestClient (that is also the one used in the CLI client when I > use > >>> the > >>>> run action it if I'm not wrong). However both methods don't trigger > the > >>> job > >>>> listener. > >>>> > >>>> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <aljos...@apache.org> ha > >>> scritto: > >>>> > >>>>> @Flavio, when you're saying you're using the RestClusterClient, you > are > >>>>> not actually using that manually, right? You're just submitting your > >>> job > >>>>> via "bin/flink run ...", right? > >>>>> > >>>>> What's the exact invocation of "bin/flink run" that you're using? > >>>>> > >>>>> On 19.11.20 09:29, Andrey Zagrebin wrote: > >>>>>> Hi Flavio, > >>>>>> > >>>>>> I think I can reproduce what you are reporting (assuming you also > pass > >>>>>> '--output' to 'flink run'). > >>>>>> I am not sure why it behaves like this. I would suggest filing a > Jira > >>>>>> ticket for this. > >>>>>> > >>>>>> Best, > >>>>>> Andrey > >>>>>> > >>>>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier < > >>> pomperma...@okkam.it > >>>>>> > >>>>>> wrote: > >>>>>> > >>>>>>> is this a bug or is it a documentation problem...? > >>>>>>> > >>>>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <pomperma...@okkam.it > > > >>> ha > >>>>>>> scritto: > >>>>>>> > >>>>>>>> I've also verified that the problem persist also using a modified > >>>>> version > >>>>>>>> of the WordCount class. > >>>>>>>> If you add the code pasted at the end of this email at the end of > >>> its > >>>>>>>> main method you can verify that the listener is called if you run > >>> the > >>>>>>>> program from the IDE, but it's not called if you submit the job > >>> using > >>>>> the > >>>>>>>> CLI client using the command > >>>>>>>> > >>>>>>>> - bin/flink run > >>>>>>>> > >>>>> > >>> > > /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar > >>>>>>>> > >>>>>>>> Maybe this is an expected result but I didn't find any > >>> documentation of > >>>>>>>> this behaviour (neither in the Javadoc or in the flink web site, > >>> where > >>>>> I > >>>>>>>> can't find any documentation about JobListener at all). > >>>>>>>> > >>>>>>>> [Code to add to main()] > >>>>>>>> // emit result > >>>>>>>> if (params.has("output")) { > >>>>>>>> counts.writeAsCsv(params.get("output"), "\n", " "); > >>>>>>>> // execute program > >>>>>>>> env.registerJobListener(new JobListener() { > >>>>>>>> > >>>>>>>> @Override > >>>>>>>> public void onJobSubmitted(JobClient arg0, Throwable > >>> arg1) { > >>>>>>>> System.out.println("**************** SUBMITTED"); > >>>>>>>> } > >>>>>>>> > >>>>>>>> @Override > >>>>>>>> public void onJobExecuted(JobExecutionResult arg0, > >>> Throwable > >>>>>>>> arg1) { > >>>>>>>> System.out.println("**************** EXECUTED"); > >>>>>>>> } > >>>>>>>> }); > >>>>>>>> env.execute("WordCount Example"); > >>>>>>>> } else { > >>>>>>>> System.out.println("Printing result to stdout. Use > --output > >>> to > >>>>>>>> specify output path."); > >>>>>>>> counts.print(); > >>>>>>>> } > >>>>>>>> > >>>>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier < > >>>>> pomperma...@okkam.it> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> see inline > >>>>>>>>> > >>>>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <matth...@ververica.com> > >>> ha > >>>>>>>>> scritto: > >>>>>>>>> > >>>>>>>>>> Hi Flavio, > >>>>>>>>>> thanks for sharing this with the Flink community. Could you > answer > >>>>> the > >>>>>>>>>> following questions, please: > >>>>>>>>>> - What's the code of your Job's main method? > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> it's actually very simple...the main class creates a batch > >>> execution > >>>>> env > >>>>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register > a > >>> job > >>>>>>>>> listener to the env and I do some stuff before calling > >>> env.execute(). > >>>>>>>>> The listener is executed correctly but if I use the > >>> RestClusterClient > >>>>> to > >>>>>>>>> sibmit the jobGraph exyracted from that main contained in a jar, > >>> the > >>>>>>>>> program is executed as usual but the job listener is not called. > >>>>>>>>> > >>>>>>>>> - What cluster backend and application do you use to execute the > >>> job? > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> I use a standalone session cluster for the moment > >>>>>>>>> > >>>>>>>>> - Is there anything suspicious you can find in the logs that > might > >>> be > >>>>>>>>>> related? > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> no unfortunately.. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Matthias > >>>>>>>>>> > >>>>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier < > >>>>>>>>>> pomperma...@okkam.it> wrote: > >>>>>>>>>> > >>>>>>>>>>> Actually what I'm experiencing is that the JobListener is > >>> executed > >>>>>>>>>>> successfully if I run my main class from the IDE, while the job > >>>>> listener is > >>>>>>>>>>> not fired at all if I submit the JobGraph of the application > to a > >>>>> cluster > >>>>>>>>>>> using the RestClusterClient.. > >>>>>>>>>>> Am I doing something wrong? > >>>>>>>>>>> > >>>>>>>>>>> My main class ends with the env.execute() and i do > >>>>>>>>>>> env.registerJobListener() when I create the Exceution env > >>>>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment(). > >>>>>>>>>>> > >>>>>>>>>>> Thanks in advance for any help, > >>>>>>>>>>> Flavio > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier < > >>>>>>>>>>> pomperma...@okkam.it> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hello everybody, > >>>>>>>>>>>> I'm trying to use the JobListener to track when a job finishes > >>>>> (with > >>>>>>>>>>>> Flink 1.11.0). > >>>>>>>>>>>> It works great but I have the problem that logs inside > >>>>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal? > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Flavio > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> > >>>>> > >>>>> > >>>> > >>> > >>> > > > >