I think that the problem is that my REST service submits the job to
the Flink standalone cluster and responds to the client with the
submitted job ID.
To achieve this, I was using the
RestClusterClient<StandaloneClusterId> because with that I can use the
following code and retrieve the JobID:

    (1) JobID flinkJobId =
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID();

Unfortunately this does not activate the job listener (that is quite
surprising to me...I thought that such a listener was triggered by the
JobManager).
So, after Aljoscha answer I take a deeper look into the Flink CLI code
and what it does is basically this:

    (2) ClientUtils.executeProgram(new DefaultExecutorServiceLoader(),
flinkConf, packagedProgram, false, false);

That works as expected (I wasn't aware of the ThreadLocal mechanism
used by the ContextEnvironment and StreamContextEnvironment: a very
advanced programming technique) but it does not allow to get back the
job id that I need..I can live with that because I can save the Flink
Job ID in an external service when the job listener triggers the
onJobSubmitted method but I think this mechanism is quite weird..

So my question is: is there a simple way to achieve my goal? Am I
doing something wrong?
At the moment I had to implement a job-status polling thread after the
line (1) but this looks like a  workaround to me..

Best,
Flavio

On Thu, Nov 19, 2020 at 4:28 PM Flavio Pompermaier <pomperma...@okkam.it> wrote:
>
> You're right..I removed my flink dir and I re-extracted it and now it
> works. Unfortunately I didn't keep the old version to understand what
> were the difference but the error was probably caused by the fact that
> I had a previous version of the WordCount.jar (without the listener)
> in the flink lib dir.. (in another dev session I was experimenting in
> running the job having the user jar in the lib dir). Sorry for the
> confusion.
> Just one last question: is the listener executed on the client or on
> the job server? This is not entirely clear to me..
>
> Best,
> Flavio
>
> On Thu, Nov 19, 2020 at 1:53 PM Andrey Zagrebin <azagre...@apache.org> wrote:
> >
> > I also tried 1.11.0 and 1.11.2, both work for me.
> >
> > On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek <aljos...@apache.org> 
> > wrote:
> >>
> >> Hmm, there was this issue:
> >> https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed
> >> in your version.
> >>
> >> On 19.11.20 12:58, Flavio Pompermaier wrote:
> >> > Which version are you using?
> >> > I used the exact same commands on Flink 1.11.0 and I didn't get the job
> >> > listener output..
> >> >
> >> > Il gio 19 nov 2020, 12:53 Andrey Zagrebin <azagre...@apache.org> ha 
> >> > scritto:
> >> >
> >> >> Hi Flavio and Aljoscha,
> >> >>
> >> >> Sorry for the late heads up. I could not actually reproduce the reported
> >> >> problem with 'flink run' and local standalone cluster on master.
> >> >> I get the expected output with the suggested modification of WordCount
> >> >> program:
> >> >>
> >> >> $ bin/start-cluster.sh
> >> >>
> >> >> $ rm -rf out; bin/flink run
> >> >> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
> >> >> flink/build-target/out
> >> >>
> >> >> Executing WordCount example with default input data set.
> >> >> Use --input to specify file input.
> >> >> **************** SUBMITTED
> >> >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
> >> >> Program execution finished
> >> >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
> >> >> Job Runtime: 139 ms
> >> >>
> >> >> **************** EXECUTED
> >> >>
> >> >> Best,
> >> >> Andrey
> >> >>
> >> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <aljos...@apache.org>
> >> >> wrote:
> >> >>
> >> >>> JobListener.onJobExecuted() is only invoked in
> >> >>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If 
> >> >>> none
> >> >>> of these is still in the call chain with that setup then the listener
> >> >>> will not be invoked.
> >> >>>
> >> >>> Also, this would only happen on the client, not on the broker (in your
> >> >>> case) or the server (JobManager).
> >> >>>
> >> >>> Does that help to debug the problem?
> >> >>>
> >> >>> Aljoscha
> >> >>>
> >> >>> On 19.11.20 09:49, Flavio Pompermaier wrote:
> >> >>>> I have a spring boot job server that act as a broker towards our
> >> >>>> application and a Flink session cluster. To submit a job I use the
> >> >>>> FlinkRestClient (that is also the one used in the CLI client when I 
> >> >>>> use
> >> >>> the
> >> >>>> run action it if I'm not wrong). However both methods don't trigger 
> >> >>>> the
> >> >>> job
> >> >>>> listener.
> >> >>>>
> >> >>>> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <aljos...@apache.org> ha
> >> >>> scritto:
> >> >>>>
> >> >>>>> @Flavio, when you're saying you're using the RestClusterClient, you 
> >> >>>>> are
> >> >>>>> not actually using that manually, right? You're just submitting your
> >> >>> job
> >> >>>>> via "bin/flink run ...", right?
> >> >>>>>
> >> >>>>> What's the exact invocation of "bin/flink run" that you're using?
> >> >>>>>
> >> >>>>> On 19.11.20 09:29, Andrey Zagrebin wrote:
> >> >>>>>> Hi Flavio,
> >> >>>>>>
> >> >>>>>> I think I can reproduce what you are reporting (assuming you also 
> >> >>>>>> pass
> >> >>>>>> '--output' to 'flink run').
> >> >>>>>> I am not sure why it behaves like this. I would suggest filing a 
> >> >>>>>> Jira
> >> >>>>>> ticket for this.
> >> >>>>>>
> >> >>>>>> Best,
> >> >>>>>> Andrey
> >> >>>>>>
> >> >>>>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
> >> >>> pomperma...@okkam.it
> >> >>>>>>
> >> >>>>>> wrote:
> >> >>>>>>
> >> >>>>>>> is this a bug or is it a documentation problem...?
> >> >>>>>>>
> >> >>>>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <pomperma...@okkam.it>
> >> >>> ha
> >> >>>>>>> scritto:
> >> >>>>>>>
> >> >>>>>>>> I've also verified that the problem persist also using a modified
> >> >>>>> version
> >> >>>>>>>> of the WordCount class.
> >> >>>>>>>> If you add the code pasted at the end of this email at the end of
> >> >>> its
> >> >>>>>>>> main method you can verify that the listener is called if you run
> >> >>> the
> >> >>>>>>>> program from the IDE, but it's not called if you submit the job
> >> >>> using
> >> >>>>> the
> >> >>>>>>>> CLI client using the command
> >> >>>>>>>>
> >> >>>>>>>>       - bin/flink run
> >> >>>>>>>>
> >> >>>>>
> >> >>>   
> >> >>> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
> >> >>>>>>>>
> >> >>>>>>>> Maybe this is an expected result but I didn't find any
> >> >>> documentation of
> >> >>>>>>>> this behaviour (neither in the Javadoc or in the flink web site,
> >> >>> where
> >> >>>>> I
> >> >>>>>>>> can't find any documentation about JobListener at all).
> >> >>>>>>>>
> >> >>>>>>>> [Code to add to main()]
> >> >>>>>>>>        // emit result
> >> >>>>>>>>        if (params.has("output")) {
> >> >>>>>>>>          counts.writeAsCsv(params.get("output"), "\n", " ");
> >> >>>>>>>>          // execute program
> >> >>>>>>>>          env.registerJobListener(new JobListener() {
> >> >>>>>>>>
> >> >>>>>>>>            @Override
> >> >>>>>>>>            public void onJobSubmitted(JobClient arg0, Throwable
> >> >>> arg1) {
> >> >>>>>>>>              System.out.println("**************** SUBMITTED");
> >> >>>>>>>>            }
> >> >>>>>>>>
> >> >>>>>>>>            @Override
> >> >>>>>>>>            public void onJobExecuted(JobExecutionResult arg0,
> >> >>> Throwable
> >> >>>>>>>> arg1) {
> >> >>>>>>>>              System.out.println("**************** EXECUTED");
> >> >>>>>>>>            }
> >> >>>>>>>>          });
> >> >>>>>>>>          env.execute("WordCount Example");
> >> >>>>>>>>        } else {
> >> >>>>>>>>          System.out.println("Printing result to stdout. Use 
> >> >>>>>>>> --output
> >> >>> to
> >> >>>>>>>> specify output path.");
> >> >>>>>>>>          counts.print();
> >> >>>>>>>>        }
> >> >>>>>>>>
> >> >>>>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
> >> >>>>> pomperma...@okkam.it>
> >> >>>>>>>> wrote:
> >> >>>>>>>>
> >> >>>>>>>>> see inline
> >> >>>>>>>>>
> >> >>>>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <matth...@ververica.com>
> >> >>> ha
> >> >>>>>>>>> scritto:
> >> >>>>>>>>>
> >> >>>>>>>>>> Hi Flavio,
> >> >>>>>>>>>> thanks for sharing this with the Flink community. Could you 
> >> >>>>>>>>>> answer
> >> >>>>> the
> >> >>>>>>>>>> following questions, please:
> >> >>>>>>>>>> - What's the code of your Job's main method?
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> it's actually very simple...the main class creates a batch
> >> >>> execution
> >> >>>>> env
> >> >>>>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register 
> >> >>>>>>>>> a
> >> >>> job
> >> >>>>>>>>> listener to the env and I do some stuff before calling
> >> >>> env.execute().
> >> >>>>>>>>> The listener is executed correctly but if I use the
> >> >>> RestClusterClient
> >> >>>>> to
> >> >>>>>>>>> sibmit the jobGraph exyracted from that main contained in a jar,
> >> >>> the
> >> >>>>>>>>> program is executed as usual but the job listener is not called.
> >> >>>>>>>>>
> >> >>>>>>>>> - What cluster backend and application do you use to execute the
> >> >>> job?
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> I use a standalone session cluster for the moment
> >> >>>>>>>>>
> >> >>>>>>>>> - Is there anything suspicious you can find in the logs that 
> >> >>>>>>>>> might
> >> >>> be
> >> >>>>>>>>>> related?
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> no unfortunately..
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>> Matthias
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
> >> >>>>>>>>>> pomperma...@okkam.it> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>>> Actually what I'm experiencing is that the JobListener is
> >> >>> executed
> >> >>>>>>>>>>> successfully if I run my main class from the IDE, while the job
> >> >>>>> listener is
> >> >>>>>>>>>>> not fired at all if I submit the JobGraph of the application 
> >> >>>>>>>>>>> to a
> >> >>>>> cluster
> >> >>>>>>>>>>> using the RestClusterClient..
> >> >>>>>>>>>>> Am I doing something wrong?
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> My main class ends with the env.execute() and i do
> >> >>>>>>>>>>> env.registerJobListener() when I create the Exceution env
> >> >>>>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Thanks in advance for any help,
> >> >>>>>>>>>>> Flavio
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
> >> >>>>>>>>>>> pomperma...@okkam.it> wrote:
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>> Hello everybody,
> >> >>>>>>>>>>>> I'm trying to use the JobListener to track when a job finishes
> >> >>>>> (with
> >> >>>>>>>>>>>> Flink 1.11.0).
> >> >>>>>>>>>>>> It works great but I have the problem that logs inside
> >> >>>>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Best,
> >> >>>>>>>>>>>> Flavio
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >
> >>

Reply via email to