[ https://issues.apache.org/jira/browse/FLINK-36010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexander Fedulov updated FLINK-36010: -------------------------------------- Fix Version/s: 1.19.3 1.20.2 (was: 1.20.1) > Duplicate Job Submission returns Succeeded on single execution even if Global > state is FAILED > --------------------------------------------------------------------------------------------- > > Key: FLINK-36010 > URL: https://issues.apache.org/jira/browse/FLINK-36010 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission > Affects Versions: 1.8.3, 1.17.2, 1.20.0, 1.19.1 > Reporter: Ahmed Hamdy > Priority: Major > Fix For: 1.19.3, 1.20.2 > > > h2. Description > Running a job on single execution mode with HA enabled typically [short > circuits duplicate execution of job after it reaches globally terminal > state|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L316]. > However this returns {{ApplicationStatus.SUCCEEDED}} even if initial global > job status is FAILED. This breaks the [consistency of > states.|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L48] > h2. Reproducing steps > 1- Create Flink configmap and service account as in > [https://github.com/vahmed-hamdy/flink-test-projects/tree/master/k8s/job-dedupe] > 2- Submit Flink Job and TM deployments similar to > [https://github.com/vahmed-hamdy/flink-test-projects/blob/master/k8s/job-dedupe/FlinkApplicationJob.yml] > 3- delete Taskmanager pods enforcing Job failover until restart count > consumed, The Job now reached Terminal state {{{}FAILED{}}}. > 4- Batch Job is now restarted due to job manager failover, restarted but due > to single execution mode the job manager doesn't resubmit the job. > 5- The job is recorded as {{SUCCEEDED}} due to detecting a globally terminal > state (FAILED). > {code:java} > Printing result to stdout. Use --output to specify output path. > 2024-08-08 12:32:50,110 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Job 3d98a3e455408045defa90c6b0b03a5a is submitted. > 2024-08-08 12:32:50,110 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Submitting Job with JobId=3d98a3e455408045defa90c6b0b03a5a. > 2024-08-08 12:32:50,118 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received > JobGraph submission 'CarTopSpeedWindowingExample' > (3d98a3e455408045defa90c6b0b03a5a). > 2024-08-08 12:32:50,164 WARN > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring > JobGraph submission 'CarTopSpeedWindowingExample' > (3d98a3e455408045defa90c6b0b03a5a) because the job already reached a > globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous > execution. > 2024-08-08 12:32:50,171 INFO > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap > [] - Application completed SUCCESSFULLY > 2024-08-08 12:32:50,171 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting > StandaloneApplicationClusterEntryPoint down with application status > SUCCEEDED. Diagnostics null. > {code} > h3. Resource Highlights > {code:java} > flink-conf.yaml: | > job-result-store.delete-on-commit: false > high-availability.type: ZOOKEEPER > high-availability.jobmanager.port: 6125 > high-availability.storageDir: s3a://flink-high-availability-dir > high-availability.zookeeper.quorum: my-release-zookeeper:2181 > high-availability.zookeeper.path.root: /flink > jobmanager.execution.slot-allocation-timeout: 10000 > {code} > {code:java} > spec: > restartPolicy: OnFailure > containers: > - name: jobmanager > image: apache/flink:1.20.0-scala_2.12 > env: > - name: POD_IP > valueFrom: > fieldRef: > apiVersion: v1 > fieldPath: status.podIP > args: ["standalone-job","--host", "$(POD_IP)", "--job-classname", > "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing", > "--job-id", "3d98a3e455408045defa90c6b0b03a5a"] > {code} > > h2. Expected Outcome > Instead of recording the Application state to SUCCEEDED on resubmission it > should be recorded as FAILED because the globally terminal state is FAILED > h3. Appendix: JobResultStore on HA > {code:java} > {"result":{"id":"3d98a3e455408045defa90c6b0b03a5a","application-status":"FAILED","accumulator-results":{},"net-runtime":55991,"failure-cause":{"class":"org.apache.flink.runtime.JobException","stack-trace":"org.apache.flink.runtime.JobException: > Recovery is suppressed by > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, > backoffTimeMS=1).....{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)