I think that the problem is that my REST service submits the job to the Flink standalone cluster and responds to the client with the submitted job ID. To achieve this, I was using the RestClusterClient<StandaloneClusterId> because with that I can use the following code and retrieve the JobID:
(1) JobID flinkJobId = client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID(); Unfortunately this does not activate the job listener (that is quite surprising to me...I thought that such a listener was triggered by the JobManager). So, after Aljoscha answer I take a deeper look into the Flink CLI code and what it does is basically this: (2) ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), flinkConf, packagedProgram, false, false); That works as expected (I wasn't aware of the ThreadLocal mechanism used by the ContextEnvironment and StreamContextEnvironment: a very advanced programming technique) but it does not allow to get back the job id that I need..I can live with that because I can save the Flink Job ID in an external service when the job listener triggers the onJobSubmitted method but I think this mechanism is quite weird.. So my question is: is there a simple way to achieve my goal? Am I doing something wrong? At the moment I had to implement a job-status polling thread after the line (1) but this looks like a workaround to me.. Best, Flavio On Thu, Nov 19, 2020 at 4:28 PM Flavio Pompermaier <pomperma...@okkam.it> wrote: > > You're right..I removed my flink dir and I re-extracted it and now it > works. Unfortunately I didn't keep the old version to understand what > were the difference but the error was probably caused by the fact that > I had a previous version of the WordCount.jar (without the listener) > in the flink lib dir.. (in another dev session I was experimenting in > running the job having the user jar in the lib dir). Sorry for the > confusion. > Just one last question: is the listener executed on the client or on > the job server? This is not entirely clear to me.. > > Best, > Flavio > > On Thu, Nov 19, 2020 at 1:53 PM Andrey Zagrebin <azagre...@apache.org> wrote: > > > > I also tried 1.11.0 and 1.11.2, both work for me. > > > > On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek <aljos...@apache.org> > > wrote: > >> > >> Hmm, there was this issue: > >> https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed > >> in your version. > >> > >> On 19.11.20 12:58, Flavio Pompermaier wrote: > >> > Which version are you using? > >> > I used the exact same commands on Flink 1.11.0 and I didn't get the job > >> > listener output.. > >> > > >> > Il gio 19 nov 2020, 12:53 Andrey Zagrebin <azagre...@apache.org> ha > >> > scritto: > >> > > >> >> Hi Flavio and Aljoscha, > >> >> > >> >> Sorry for the late heads up. I could not actually reproduce the reported > >> >> problem with 'flink run' and local standalone cluster on master. > >> >> I get the expected output with the suggested modification of WordCount > >> >> program: > >> >> > >> >> $ bin/start-cluster.sh > >> >> > >> >> $ rm -rf out; bin/flink run > >> >> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output > >> >> flink/build-target/out > >> >> > >> >> Executing WordCount example with default input data set. > >> >> Use --input to specify file input. > >> >> **************** SUBMITTED > >> >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07 > >> >> Program execution finished > >> >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished. > >> >> Job Runtime: 139 ms > >> >> > >> >> **************** EXECUTED > >> >> > >> >> Best, > >> >> Andrey > >> >> > >> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <aljos...@apache.org> > >> >> wrote: > >> >> > >> >>> JobListener.onJobExecuted() is only invoked in > >> >>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If > >> >>> none > >> >>> of these is still in the call chain with that setup then the listener > >> >>> will not be invoked. > >> >>> > >> >>> Also, this would only happen on the client, not on the broker (in your > >> >>> case) or the server (JobManager). > >> >>> > >> >>> Does that help to debug the problem? > >> >>> > >> >>> Aljoscha > >> >>> > >> >>> On 19.11.20 09:49, Flavio Pompermaier wrote: > >> >>>> I have a spring boot job server that act as a broker towards our > >> >>>> application and a Flink session cluster. To submit a job I use the > >> >>>> FlinkRestClient (that is also the one used in the CLI client when I > >> >>>> use > >> >>> the > >> >>>> run action it if I'm not wrong). However both methods don't trigger > >> >>>> the > >> >>> job > >> >>>> listener. > >> >>>> > >> >>>> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <aljos...@apache.org> ha > >> >>> scritto: > >> >>>> > >> >>>>> @Flavio, when you're saying you're using the RestClusterClient, you > >> >>>>> are > >> >>>>> not actually using that manually, right? You're just submitting your > >> >>> job > >> >>>>> via "bin/flink run ...", right? > >> >>>>> > >> >>>>> What's the exact invocation of "bin/flink run" that you're using? > >> >>>>> > >> >>>>> On 19.11.20 09:29, Andrey Zagrebin wrote: > >> >>>>>> Hi Flavio, > >> >>>>>> > >> >>>>>> I think I can reproduce what you are reporting (assuming you also > >> >>>>>> pass > >> >>>>>> '--output' to 'flink run'). > >> >>>>>> I am not sure why it behaves like this. I would suggest filing a > >> >>>>>> Jira > >> >>>>>> ticket for this. > >> >>>>>> > >> >>>>>> Best, > >> >>>>>> Andrey > >> >>>>>> > >> >>>>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier < > >> >>> pomperma...@okkam.it > >> >>>>>> > >> >>>>>> wrote: > >> >>>>>> > >> >>>>>>> is this a bug or is it a documentation problem...? > >> >>>>>>> > >> >>>>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <pomperma...@okkam.it> > >> >>> ha > >> >>>>>>> scritto: > >> >>>>>>> > >> >>>>>>>> I've also verified that the problem persist also using a modified > >> >>>>> version > >> >>>>>>>> of the WordCount class. > >> >>>>>>>> If you add the code pasted at the end of this email at the end of > >> >>> its > >> >>>>>>>> main method you can verify that the listener is called if you run > >> >>> the > >> >>>>>>>> program from the IDE, but it's not called if you submit the job > >> >>> using > >> >>>>> the > >> >>>>>>>> CLI client using the command > >> >>>>>>>> > >> >>>>>>>> - bin/flink run > >> >>>>>>>> > >> >>>>> > >> >>> > >> >>> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar > >> >>>>>>>> > >> >>>>>>>> Maybe this is an expected result but I didn't find any > >> >>> documentation of > >> >>>>>>>> this behaviour (neither in the Javadoc or in the flink web site, > >> >>> where > >> >>>>> I > >> >>>>>>>> can't find any documentation about JobListener at all). > >> >>>>>>>> > >> >>>>>>>> [Code to add to main()] > >> >>>>>>>> // emit result > >> >>>>>>>> if (params.has("output")) { > >> >>>>>>>> counts.writeAsCsv(params.get("output"), "\n", " "); > >> >>>>>>>> // execute program > >> >>>>>>>> env.registerJobListener(new JobListener() { > >> >>>>>>>> > >> >>>>>>>> @Override > >> >>>>>>>> public void onJobSubmitted(JobClient arg0, Throwable > >> >>> arg1) { > >> >>>>>>>> System.out.println("**************** SUBMITTED"); > >> >>>>>>>> } > >> >>>>>>>> > >> >>>>>>>> @Override > >> >>>>>>>> public void onJobExecuted(JobExecutionResult arg0, > >> >>> Throwable > >> >>>>>>>> arg1) { > >> >>>>>>>> System.out.println("**************** EXECUTED"); > >> >>>>>>>> } > >> >>>>>>>> }); > >> >>>>>>>> env.execute("WordCount Example"); > >> >>>>>>>> } else { > >> >>>>>>>> System.out.println("Printing result to stdout. Use > >> >>>>>>>> --output > >> >>> to > >> >>>>>>>> specify output path."); > >> >>>>>>>> counts.print(); > >> >>>>>>>> } > >> >>>>>>>> > >> >>>>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier < > >> >>>>> pomperma...@okkam.it> > >> >>>>>>>> wrote: > >> >>>>>>>> > >> >>>>>>>>> see inline > >> >>>>>>>>> > >> >>>>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <matth...@ververica.com> > >> >>> ha > >> >>>>>>>>> scritto: > >> >>>>>>>>> > >> >>>>>>>>>> Hi Flavio, > >> >>>>>>>>>> thanks for sharing this with the Flink community. Could you > >> >>>>>>>>>> answer > >> >>>>> the > >> >>>>>>>>>> following questions, please: > >> >>>>>>>>>> - What's the code of your Job's main method? > >> >>>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> it's actually very simple...the main class creates a batch > >> >>> execution > >> >>>>> env > >> >>>>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register > >> >>>>>>>>> a > >> >>> job > >> >>>>>>>>> listener to the env and I do some stuff before calling > >> >>> env.execute(). > >> >>>>>>>>> The listener is executed correctly but if I use the > >> >>> RestClusterClient > >> >>>>> to > >> >>>>>>>>> sibmit the jobGraph exyracted from that main contained in a jar, > >> >>> the > >> >>>>>>>>> program is executed as usual but the job listener is not called. > >> >>>>>>>>> > >> >>>>>>>>> - What cluster backend and application do you use to execute the > >> >>> job? > >> >>>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> I use a standalone session cluster for the moment > >> >>>>>>>>> > >> >>>>>>>>> - Is there anything suspicious you can find in the logs that > >> >>>>>>>>> might > >> >>> be > >> >>>>>>>>>> related? > >> >>>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> no unfortunately.. > >> >>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>>> Best, > >> >>>>>>>>>> Matthias > >> >>>>>>>>>> > >> >>>>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier < > >> >>>>>>>>>> pomperma...@okkam.it> wrote: > >> >>>>>>>>>> > >> >>>>>>>>>>> Actually what I'm experiencing is that the JobListener is > >> >>> executed > >> >>>>>>>>>>> successfully if I run my main class from the IDE, while the job > >> >>>>> listener is > >> >>>>>>>>>>> not fired at all if I submit the JobGraph of the application > >> >>>>>>>>>>> to a > >> >>>>> cluster > >> >>>>>>>>>>> using the RestClusterClient.. > >> >>>>>>>>>>> Am I doing something wrong? > >> >>>>>>>>>>> > >> >>>>>>>>>>> My main class ends with the env.execute() and i do > >> >>>>>>>>>>> env.registerJobListener() when I create the Exceution env > >> >>>>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment(). > >> >>>>>>>>>>> > >> >>>>>>>>>>> Thanks in advance for any help, > >> >>>>>>>>>>> Flavio > >> >>>>>>>>>>> > >> >>>>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier < > >> >>>>>>>>>>> pomperma...@okkam.it> wrote: > >> >>>>>>>>>>> > >> >>>>>>>>>>>> Hello everybody, > >> >>>>>>>>>>>> I'm trying to use the JobListener to track when a job finishes > >> >>>>> (with > >> >>>>>>>>>>>> Flink 1.11.0). > >> >>>>>>>>>>>> It works great but I have the problem that logs inside > >> >>>>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal? > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> Best, > >> >>>>>>>>>>>> Flavio > >> >>>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>> > >> >>>>> > >> >>>>> > >> >>>> > >> >>> > >> >>> > >> > > >>