Hi Danny,

> My current proposal is that the REST API should not leave the Flink
cluster
in an inconsistent state.

Regarding consistency, Flink only cares about individual jobs, but I can
see your point.

For streaming, this is probably something we could address by book-keeping
jobs submitted by the jar and canceling them on exception. This is not
prone to JM failure and would be subject to bug reports because there are
no guarantees (it could be addressed; it's not straightforward; we've spent
years getting this correct for Application mode).

A bigger problem is that the main method could have the side-effects that
you don't know how to roll-back. For example, creating directories for
batch outputs and moving files. To make this 100% correct, we'd need to
introduce a new set of client-facing APIs.

I'm unaware of any framework that did this right (MR, Spark, etc.... had
the same issue); you must solve HA for drivers/client programs first.

These are quick thoughts; something simple that works for 90% might be
worth pursuing, ignoring the corner cases.

Best,
D.

On Tue, Nov 14, 2023 at 10:00 AM Danny Cranmer <dannycran...@apache.org>
wrote:

> Hey all,
>
> We run Flink clusters in session mode; we upload the user jar and then
> invoke "/jars/:jarid/run" [1] REST API endpoint. We have noticed a
> discrepancy in the run endpoint and were hoping to get some feedback from
> the community before proposing a FLIP or Jira to fix it.
>
> Some problem context: The "/jars/:jarid/run" endpoint runs the main()
> method in the user jar. When the call is successful the API will return the
> job ID (case 1). When the call throws an error, the API will return the
> error message (case 2). If a job is submitted successfully AND it throws an
> error, the result is a running job but the API returns the error message
> (case 3). There are two common cases that can result in this failure mode:
> 1/ the user code attempts to run multiple jobs, the first is successful and
> the second is rejected [2]. 2/ the user code throws an arbitrary exception
> after successfully submitting a job. Reproduction code for both is included
> below. For case 3 the client must 1/ run a jar, 2/ query for running jobs
> and 3/ decide how to proceed, either cleaning up or marking it as
> successful. This is overloading the responsibility of the client.
>
> My current proposal is that the REST API should not leave the Flink cluster
> in an inconsistent state. If the run command is successful we should have a
> running job, if the run command fails, we should not have any running jobs.
> There are a few ways to achieve this, but I would like to leave that
> discussion to the FLIP. Right now I am looking for feedback on the desired
> API behaviour.
>
> 1/ The user code attempts to run multiple jobs (Flink 1.15)
>
> public class MultipleJobs {
>     public static final long ROWS_PER_SECOND = 1;
>     public static final long TOTAL_ROWS = 1_000_000;
>
>     public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> env.addSource(new DataGeneratorSource<>(stringGenerator(32),
> ROWS_PER_SECOND,
> TOTAL_ROWS))
> .returns(String.class)
> .print();
>
> env.execute("Job #1");
>
> env.addSource(new DataGeneratorSource<>(stringGenerator(32),
> ROWS_PER_SECOND,
> TOTAL_ROWS))
> .returns(String.class)
> .print();
>
> env.execute("Job #2");
> }
> }
>
>
> 2/ The user code throws an arbitrary exception after successfully
> submitting a job (Flink 1.15)
>
> public class CustomerErrorJobSubmission {
>     public static final long ROWS_PER_SECOND = 1;
>     public static final long TOTAL_ROWS = 1_000_000;
>
>     public static void main(String[] args) throws Exception {
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         env.addSource(new DataGeneratorSource<>(stringGenerator(32),
> ROWS_PER_SECOND, TOTAL_ROWS))
>                 .returns(String.class)
>                 .print();
>
>         env.execute("Job #1");
>
>         throw new RuntimeException("REST API call will fail, but there
> will still be a job running");
>     }
> }
>
>
> --
>
>
> Thanks,
> Danny
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/rest_api/#jars-jarid-run
> [2]
>
> https://github.com/apache/flink/blob/release-1.15/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java#L198
>

Reply via email to