Hi,
when JM goes down, it should be brought up (if configured as HA, running
on k8s, ...), and it should recover all running jobs. If this does not
happen then it means that:
a) either the JM is not in HA configuration, or
b) it is unable to recover after failure, which typically means that
there is some problem reading jobs metadata from external storage, or
some other persistent error
Either way, monitoring the job for simple presence in the JM might not
be sufficient. What I personally prefer is monitoring the lag of job's
output watermark behind current processing time. It is possible to do
this on Beam's application level, by using a looping event-time timer,
that outputs the current input watermark each T seconds. This is then
able to capture even the case, when the job is running, but is unable to
do any progress.
Best,
Jan
On 7/7/23 23:34, Adlae D'Orazio wrote:
Hi Jan,
Thank you for your response! Apologies that this wasn't clear, but
we're actually looking at what would//happen if the job server /were
/to go down. So what we are more interested in is understanding /how/
to actually monitor that the job is running. We won't know the job id
so we can't use that to query the REST API. Like I said, we were
looking into that method that initializes an AppName, but that was
written in Java. What do you think we should do? Thank you so much for
your help!
Best,
Adlae D'Orazio
On Fri, Jul 7, 2023 at 1:28 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,
if I understand correctly, you have a 'program runner' (sometimes
called a driver), which is supposed to be long-running and
watching if the submitted Pipeline runs or not. If not, then the
driver resubmits the job. If my understanding is correct, I would
suggest looking into the reasons why the pipeline terminates in
the first place. Flink is designed to ensure that after job
submission it is fault-tolerant for both application-level errors
(e.g. transient user code errors, external dependencies failures,
etc) and the Flink runtime itself (failures of taskmanagers or
jobmanager). The most often case when this does not work is some
sort of misconfiguration (typically inability to restore jobs
after failure of jobmanager). Having said that it is good idea to
_monitor_ that your job runs (and ideally that it makes progress,
because the pure fact that job 'runs' does not imply that), but it
should require manual action in case the job is permanently gone.
Simple resubmission of the job is not what I would expect to work
well.
Best,
Jan
On 7/6/23 22:07, Adlae D'Orazio wrote:
Hello,
I am using an Apache Flink cluster to run a streaming pipeline
that I've created using Apache Beam. This streaming pipeline
should be the only one of its type running on the Flink cluster,
and I need some help with how to ensure that is the case.
A Dockerized pipeline runner program submits the streaming
pipeline, and if the pipeline exits (i.e. because of an error),
then the pipeline runner program exits and is re-run, so that the
pipeline is submitted again and continues running.
The problem I am running into is that if the pipeline runner
program exits, but the streaming pipeline is still running (i.e.
because the job server went down and came back up), then I need
to check in the pipeline runner program whether or not the
pipeline is still running, or if it has gone down.
My first thought was to try to create a specific job name that
would be stored in Flink's REST API, and then to see if the job
was already running, I could query the REST API for that name.
I'm having trouble doing this. I seem to be able to set a job
name in Beam, but that job name does not seem to be accessible
via Flink’s REST API once the pipeline is run using Flink. From
researching this problem, I found this
<https://github.com/apache/beam/blob/9a11e28ce79e3b243a13fbf148f2ba26b8c14107/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java#L340>method,
which initializes an AppName. This seems promising to me, but it
is written in Java and I am looking to do it in Python.
Is there a way to specify the Flink job name via the Beam Python
SDK? Or is there a simpler way to know that a particular Beam
pipeline is running, and therefore not resubmit it?
Please let me know if you have any suggestions - either about how
to execute the approaches I've described or if there's a simpler
solution that I am overlooking. Thank you for your help!
Best,
Adlae D'Orazio