Hi,
if I understand correctly, you have a 'program runner' (sometimes called
a driver), which is supposed to be long-running and watching if the
submitted Pipeline runs or not. If not, then the driver resubmits the
job. If my understanding is correct, I would suggest looking into the
reasons why the pipeline terminates in the first place. Flink is
designed to ensure that after job submission it is fault-tolerant for
both application-level errors (e.g. transient user code errors, external
dependencies failures, etc) and the Flink runtime itself (failures of
taskmanagers or jobmanager). The most often case when this does not work
is some sort of misconfiguration (typically inability to restore jobs
after failure of jobmanager). Having said that it is good idea to
_monitor_ that your job runs (and ideally that it makes progress,
because the pure fact that job 'runs' does not imply that), but it
should require manual action in case the job is permanently gone. Simple
resubmission of the job is not what I would expect to work well.
Best,
Jan
On 7/6/23 22:07, Adlae D'Orazio wrote:
Hello,
I am using an Apache Flink cluster to run a streaming pipeline that
I've created using Apache Beam. This streaming pipeline should be the
only one of its type running on the Flink cluster, and I need some
help with how to ensure that is the case.
A Dockerized pipeline runner program submits the streaming pipeline,
and if the pipeline exits (i.e. because of an error), then the
pipeline runner program exits and is re-run, so that the pipeline is
submitted again and continues running.
The problem I am running into is that if the pipeline runner program
exits, but the streaming pipeline is still running (i.e. because the
job server went down and came back up), then I need to check in the
pipeline runner program whether or not the pipeline is still running,
or if it has gone down.
My first thought was to try to create a specific job name that would
be stored in Flink's REST API, and then to see if the job was already
running, I could query the REST API for that name. I'm having trouble
doing this. I seem to be able to set a job name in Beam, but that job
name does not seem to be accessible via Flink’s REST API once the
pipeline is run using Flink. From researching this problem, I found
this
<https://github.com/apache/beam/blob/9a11e28ce79e3b243a13fbf148f2ba26b8c14107/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java#L340>method,
which initializes an AppName. This seems promising to me, but it is
written in Java and I am looking to do it in Python.
Is there a way to specify the Flink job name via the Beam Python SDK?
Or is there a simpler way to know that a particular Beam pipeline is
running, and therefore not resubmit it?
Please let me know if you have any suggestions - either about how to
execute the approaches I've described or if there's a simpler solution
that I am overlooking. Thank you for your help!
Best,
Adlae D'Orazio