Ahmed Hamdy created FLINK-36010: ----------------------------------- Summary: Duplicate Job Submission returns Succeeded on single execution even if Global state is FAILED Key: FLINK-36010 URL: https://issues.apache.org/jira/browse/FLINK-36010 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.19.1, 1.20.0, 1.17.2, 1.8.3 Reporter: Ahmed Hamdy
h2. Description Running a job on single execution mode with HA enabled typically [short circuits duplicate execution of job after it reaches globally terminal state|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L316]. However this returns {{ApplicationStatus.SUCCEEDED}} even if initial global job status is FAILED. This breaks the [consistency of states.|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L48] h2. Reproducing steps 1- Create Flink configmap and service account as in [https://github.com/vahmed-hamdy/flink-test-projects/tree/master/k8s/job-dedupe] 2- Submit Flink Job and TM deployments similar to [https://github.com/vahmed-hamdy/flink-test-projects/blob/master/k8s/job-dedupe/FlinkApplicationJob.yml] 3- delete Taskmanager pods enforcing Job failover until restart count consumed, The Job now reached Terminal state {{FAILED}}. 4- Batch Job is now restarted due to job manager failover, restarted but due to single execution mode the job manager doesn't resubmit the job. 5- The job is recorded as {{SUCCEEDED}} due to detecting a globally terminal state (FAILED). {code} Printing result to stdout. Use --output to specify output path. 2024-08-08 12:32:50,110 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 3d98a3e455408045defa90c6b0b03a5a is submitted. 2024-08-08 12:32:50,110 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=3d98a3e455408045defa90c6b0b03a5a. 2024-08-08 12:32:50,118 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph submission 'CarTopSpeedWindowingExample' (3d98a3e455408045defa90c6b0b03a5a). 2024-08-08 12:32:50,164 WARN org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring JobGraph submission 'CarTopSpeedWindowingExample' (3d98a3e455408045defa90c6b0b03a5a) because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution. 2024-08-08 12:32:50,171 INFO org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application completed SUCCESSFULLY 2024-08-08 12:32:50,171 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneApplicationClusterEntryPoint down with application status SUCCEEDED. Diagnostics null. {code} h3. Resource Highlights {code:java} flink-conf.yaml: | job-result-store.delete-on-commit: false high-availability.type: ZOOKEEPER high-availability.jobmanager.port: 6125 high-availability.storageDir: s3a://flink-high-availability-dir high-availability.zookeeper.quorum: my-release-zookeeper:2181 high-availability.zookeeper.path.root: /flink jobmanager.execution.slot-allocation-timeout: 10000 {code} {code:java} spec: restartPolicy: OnFailure containers: - name: jobmanager image: apache/flink:1.20.0-scala_2.12 env: - name: POD_IP valueFrom: fieldRef: apiVersion: v1 fieldPath: status.podIP args: ["standalone-job","--host", "$(POD_IP)", "--job-classname", "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing", "--job-id", "3d98a3e455408045defa90c6b0b03a5a"] {code} h2.Expected Outcome Instead of recording the Application state to SUCCEEDED on resubmission it should be recorded as FAILED because the globally terminal state is FAILED -- This message was sent by Atlassian Jira (v8.20.10#820010)