Which version are you using?
I used the exact same commands on Flink 1.11.0 and I didn't get the job
listener output..

Il gio 19 nov 2020, 12:53 Andrey Zagrebin <azagre...@apache.org> ha scritto:

> Hi Flavio and Aljoscha,
>
> Sorry for the late heads up. I could not actually reproduce the reported
> problem with 'flink run' and local standalone cluster on master.
> I get the expected output with the suggested modification of WordCount
> program:
>
> $ bin/start-cluster.sh
>
> $ rm -rf out; bin/flink run
> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
> flink/build-target/out
>
> Executing WordCount example with default input data set.
> Use --input to specify file input.
> **************** SUBMITTED
> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
> Program execution finished
> Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
> Job Runtime: 139 ms
>
> **************** EXECUTED
>
> Best,
> Andrey
>
> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> JobListener.onJobExecuted() is only invoked in
>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
>> of these is still in the call chain with that setup then the listener
>> will not be invoked.
>>
>> Also, this would only happen on the client, not on the broker (in your
>> case) or the server (JobManager).
>>
>> Does that help to debug the problem?
>>
>> Aljoscha
>>
>> On 19.11.20 09:49, Flavio Pompermaier wrote:
>> > I have a spring boot job server that act as a broker towards our
>> > application and a Flink session cluster. To submit a job I use the
>> > FlinkRestClient (that is also the one used in the CLI client when I use
>> the
>> > run action it if I'm not wrong). However both methods don't trigger the
>> job
>> > listener.
>> >
>> > Il gio 19 nov 2020, 09:39 Aljoscha Krettek <aljos...@apache.org> ha
>> scritto:
>> >
>> >> @Flavio, when you're saying you're using the RestClusterClient, you are
>> >> not actually using that manually, right? You're just submitting your
>> job
>> >> via "bin/flink run ...", right?
>> >>
>> >> What's the exact invocation of "bin/flink run" that you're using?
>> >>
>> >> On 19.11.20 09:29, Andrey Zagrebin wrote:
>> >>> Hi Flavio,
>> >>>
>> >>> I think I can reproduce what you are reporting (assuming you also pass
>> >>> '--output' to 'flink run').
>> >>> I am not sure why it behaves like this. I would suggest filing a Jira
>> >>> ticket for this.
>> >>>
>> >>> Best,
>> >>> Andrey
>> >>>
>> >>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
>> pomperma...@okkam.it
>> >>>
>> >>> wrote:
>> >>>
>> >>>> is this a bug or is it a documentation problem...?
>> >>>>
>> >>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <pomperma...@okkam.it>
>> ha
>> >>>> scritto:
>> >>>>
>> >>>>> I've also verified that the problem persist also using a modified
>> >> version
>> >>>>> of the WordCount class.
>> >>>>> If you add the code pasted at the end of this email at the end of
>> its
>> >>>>> main method you can verify that the listener is called if you run
>> the
>> >>>>> program from the IDE, but it's not called if you submit the job
>> using
>> >> the
>> >>>>> CLI client using the command
>> >>>>>
>> >>>>>      - bin/flink run
>> >>>>>
>> >>
>>  
>> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>> >>>>>
>> >>>>> Maybe this is an expected result but I didn't find any
>> documentation of
>> >>>>> this behaviour (neither in the Javadoc or in the flink web site,
>> where
>> >> I
>> >>>>> can't find any documentation about JobListener at all).
>> >>>>>
>> >>>>> [Code to add to main()]
>> >>>>>       // emit result
>> >>>>>       if (params.has("output")) {
>> >>>>>         counts.writeAsCsv(params.get("output"), "\n", " ");
>> >>>>>         // execute program
>> >>>>>         env.registerJobListener(new JobListener() {
>> >>>>>
>> >>>>>           @Override
>> >>>>>           public void onJobSubmitted(JobClient arg0, Throwable
>> arg1) {
>> >>>>>             System.out.println("**************** SUBMITTED");
>> >>>>>           }
>> >>>>>
>> >>>>>           @Override
>> >>>>>           public void onJobExecuted(JobExecutionResult arg0,
>> Throwable
>> >>>>> arg1) {
>> >>>>>             System.out.println("**************** EXECUTED");
>> >>>>>           }
>> >>>>>         });
>> >>>>>         env.execute("WordCount Example");
>> >>>>>       } else {
>> >>>>>         System.out.println("Printing result to stdout. Use --output
>> to
>> >>>>> specify output path.");
>> >>>>>         counts.print();
>> >>>>>       }
>> >>>>>
>> >>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
>> >> pomperma...@okkam.it>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> see inline
>> >>>>>>
>> >>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <matth...@ververica.com>
>> ha
>> >>>>>> scritto:
>> >>>>>>
>> >>>>>>> Hi Flavio,
>> >>>>>>> thanks for sharing this with the Flink community. Could you answer
>> >> the
>> >>>>>>> following questions, please:
>> >>>>>>> - What's the code of your Job's main method?
>> >>>>>>>
>> >>>>>>
>> >>>>>> it's actually very simple...the main class creates a batch
>> execution
>> >> env
>> >>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a
>> job
>> >>>>>> listener to the env and I do some stuff before calling
>> env.execute().
>> >>>>>> The listener is executed correctly but if I use the
>> RestClusterClient
>> >> to
>> >>>>>> sibmit the jobGraph exyracted from that main contained in a jar,
>> the
>> >>>>>> program is executed as usual but the job listener is not called.
>> >>>>>>
>> >>>>>> - What cluster backend and application do you use to execute the
>> job?
>> >>>>>>>
>> >>>>>>
>> >>>>>> I use a standalone session cluster for the moment
>> >>>>>>
>> >>>>>> - Is there anything suspicious you can find in the logs that might
>> be
>> >>>>>>> related?
>> >>>>>>>
>> >>>>>>
>> >>>>>> no unfortunately..
>> >>>>>>
>> >>>>>>
>> >>>>>>> Best,
>> >>>>>>> Matthias
>> >>>>>>>
>> >>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
>> >>>>>>> pomperma...@okkam.it> wrote:
>> >>>>>>>
>> >>>>>>>> Actually what I'm experiencing is that the JobListener is
>> executed
>> >>>>>>>> successfully if I run my main class from the IDE, while the job
>> >> listener is
>> >>>>>>>> not fired at all if I submit the JobGraph of the application to a
>> >> cluster
>> >>>>>>>> using the RestClusterClient..
>> >>>>>>>> Am I doing something wrong?
>> >>>>>>>>
>> >>>>>>>> My main class ends with the env.execute() and i do
>> >>>>>>>> env.registerJobListener() when I create the Exceution env
>> >>>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
>> >>>>>>>>
>> >>>>>>>> Thanks in advance for any help,
>> >>>>>>>> Flavio
>> >>>>>>>>
>> >>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
>> >>>>>>>> pomperma...@okkam.it> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hello everybody,
>> >>>>>>>>> I'm trying to use the JobListener to track when a job finishes
>> >> (with
>> >>>>>>>>> Flink 1.11.0).
>> >>>>>>>>> It works great but I have the problem that logs inside
>> >>>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
>> >>>>>>>>>
>> >>>>>>>>> Best,
>> >>>>>>>>> Flavio
>> >>>>>>>>>
>> >>>>>>>>
>> >>>
>> >>
>> >>
>> >
>>
>>

Reply via email to