Hey all, We run Flink clusters in session mode; we upload the user jar and then invoke "/jars/:jarid/run" [1] REST API endpoint. We have noticed a discrepancy in the run endpoint and were hoping to get some feedback from the community before proposing a FLIP or Jira to fix it.
Some problem context: The "/jars/:jarid/run" endpoint runs the main() method in the user jar. When the call is successful the API will return the job ID (case 1). When the call throws an error, the API will return the error message (case 2). If a job is submitted successfully AND it throws an error, the result is a running job but the API returns the error message (case 3). There are two common cases that can result in this failure mode: 1/ the user code attempts to run multiple jobs, the first is successful and the second is rejected [2]. 2/ the user code throws an arbitrary exception after successfully submitting a job. Reproduction code for both is included below. For case 3 the client must 1/ run a jar, 2/ query for running jobs and 3/ decide how to proceed, either cleaning up or marking it as successful. This is overloading the responsibility of the client. My current proposal is that the REST API should not leave the Flink cluster in an inconsistent state. If the run command is successful we should have a running job, if the run command fails, we should not have any running jobs. There are a few ways to achieve this, but I would like to leave that discussion to the FLIP. Right now I am looking for feedback on the desired API behaviour. 1/ The user code attempts to run multiple jobs (Flink 1.15) public class MultipleJobs { public static final long ROWS_PER_SECOND = 1; public static final long TOTAL_ROWS = 1_000_000; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); env.addSource(new DataGeneratorSource<>(stringGenerator(32), ROWS_PER_SECOND, TOTAL_ROWS)) .returns(String.class) .print(); env.execute("Job #1"); env.addSource(new DataGeneratorSource<>(stringGenerator(32), ROWS_PER_SECOND, TOTAL_ROWS)) .returns(String.class) .print(); env.execute("Job #2"); } } 2/ The user code throws an arbitrary exception after successfully submitting a job (Flink 1.15) public class CustomerErrorJobSubmission { public static final long ROWS_PER_SECOND = 1; public static final long TOTAL_ROWS = 1_000_000; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new DataGeneratorSource<>(stringGenerator(32), ROWS_PER_SECOND, TOTAL_ROWS)) .returns(String.class) .print(); env.execute("Job #1"); throw new RuntimeException("REST API call will fail, but there will still be a job running"); } } -- Thanks, Danny [1] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/rest_api/#jars-jarid-run [2] https://github.com/apache/flink/blob/release-1.15/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java#L198