Ahmed Hamdy created FLINK-36010:
-----------------------------------

             Summary: Duplicate Job Submission returns Succeeded on single 
execution even if Global state is FAILED
                 Key: FLINK-36010
                 URL: https://issues.apache.org/jira/browse/FLINK-36010
             Project: Flink
          Issue Type: Bug
          Components: Client / Job Submission
    Affects Versions: 1.19.1, 1.20.0, 1.17.2, 1.8.3
            Reporter: Ahmed Hamdy


h2. Description

Running a job on single execution mode with HA enabled typically [short 
circuits duplicate execution of job after it reaches globally terminal 
state|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L316].
 However this returns {{ApplicationStatus.SUCCEEDED}} even if initial global 
job status is FAILED. This breaks the [consistency of 
states.|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L48]
h2. Reproducing steps

1- Create Flink configmap and service account as in 
[https://github.com/vahmed-hamdy/flink-test-projects/tree/master/k8s/job-dedupe]

2- Submit Flink Job and TM deployments similar to 
[https://github.com/vahmed-hamdy/flink-test-projects/blob/master/k8s/job-dedupe/FlinkApplicationJob.yml]

3- delete Taskmanager pods enforcing Job failover until restart count consumed, 
The Job now reached Terminal state {{FAILED}}.

4- Batch Job is now restarted due to job manager failover, restarted but due to 
single execution mode the job manager doesn't resubmit the job.

5- The job is recorded as {{SUCCEEDED}} due to detecting a globally terminal 
state (FAILED).

{code}
Printing result to stdout. Use --output to specify output path.
2024-08-08 12:32:50,110 INFO  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
Job 3d98a3e455408045defa90c6b0b03a5a is submitted.
2024-08-08 12:32:50,110 INFO  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
Submitting Job with JobId=3d98a3e455408045defa90c6b0b03a5a.
2024-08-08 12:32:50,118 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received 
JobGraph submission 'CarTopSpeedWindowingExample' 
(3d98a3e455408045defa90c6b0b03a5a).
2024-08-08 12:32:50,164 WARN  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Ignoring 
JobGraph submission 'CarTopSpeedWindowingExample' 
(3d98a3e455408045defa90c6b0b03a5a) because the job already reached a 
globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous 
execution.
2024-08-08 12:32:50,171 INFO  
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
[] - Application completed SUCCESSFULLY
2024-08-08 12:32:50,171 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting 
StandaloneApplicationClusterEntryPoint down with application status SUCCEEDED. 
Diagnostics null.
{code}



h3. Resource Highlights
{code:java}
flink-conf.yaml: |
job-result-store.delete-on-commit: false
    high-availability.type: ZOOKEEPER
    high-availability.jobmanager.port: 6125
    high-availability.storageDir: s3a://flink-high-availability-dir
    high-availability.zookeeper.quorum: my-release-zookeeper:2181
    high-availability.zookeeper.path.root: /flink
    jobmanager.execution.slot-allocation-timeout: 10000
{code}
{code:java}
spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: apache/flink:1.20.0-scala_2.12
          env:
          - name: POD_IP
            valueFrom:
              fieldRef:
                apiVersion: v1
                fieldPath: status.podIP
          args: ["standalone-job","--host", "$(POD_IP)",  "--job-classname", 
"org.apache.flink.streaming.examples.windowing.TopSpeedWindowing", "--job-id", 
"3d98a3e455408045defa90c6b0b03a5a"]
{code}
 
h2.Expected Outcome

Instead of recording the Application state to SUCCEEDED on resubmission it 
should be recorded as FAILED because the globally terminal state is FAILED



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to