[ 
https://issues.apache.org/jira/browse/FLINK-36010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875237#comment-17875237
 ] 

Ahmed Hamdy edited comment on FLINK-36010 at 8/20/24 4:23 PM:
--------------------------------------------------------------

h2.Root Cause Analysis

1- Job manager receives Job request from {{ApplicationDispatcherBootstrap}}  
([Point of 
execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L232])
To {{Dispatcher}}  ([Point of 
Execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L518C12-L518C29])
- Logs 
{code}
2024-08-20 15:04:39,705 INFO  
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
[] - Running application entrypoint.
2024-08-20 15:02:58,284 INFO  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
Job 5d98a3e455408045defa96c6b1b03a5a is submitted.
2024-08-20 15:02:58,284 INFO  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
Submitting Job with JobId=5d98a3e455408045defa96c6b1b03a5a.
2024-08-20 15:02:58,289 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received 
JobGraph submission 'CarTopSpeedWindowingExample' 
(5d98a3e455408045defa96c6b1b03a5a).
2024-08-20 15:02:58,290 INFO  org.apache.flink.runtime.dispatcher.Dispatcher    
           [] - Received JobGraph submission 'CarTopSpeedWindowingExample' 
(5d98a3e455408045defa96c6b1b03a5a).
2024-08
{code}

2- Dispatcher verifies if job {code}isInGloballyTerminalState{code} ([Point Of 
execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L523])
 which in terms checks `jobResultStore` ([Point of 
execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L583])

3- JobResultStore Finds job result in store and returns true causing single 
execution mode to not submit the job to JobMaster throwing a 
{{DuplicateJobSubmissionException}} 
{code}
2024-08-20 15:02:58,333 WARN  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Ignoring 
JobGraph submission 'CarTopSpeedWindowingExample' 
(5d98a3e455408045defa96c6b1b03a5a) because the job already reached a 
globally-terminal sta
te (i.e. FAILED, CANCELED, FINISHED) in a previous execution.
{code} 


4- The Dispatcher then catches the {{DuplicateJobSubmissionException}} in 
[point of 
execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L323]
 and marks the job as completed with {{tolerateMissingResult}} as true

5- Dispatcher then returns completed job Id and tries to extract Application 
Status [point of 
execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L264]
 which polls Job status using {{JobStatusPollingUtils}} as in 
[here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L371]
 which delegates this to `DispatacherGateway` 
[here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JobStatusPollingUtils.java#L56]
 

6- The dispatcher tries to find job master for Job and get status from job 
master ignoring jobstore [point of 
execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L896]
 both for status and job result.

7- Since tolerate missing result is true we swallow the Job not found exception 
raised by dispatcher not finding job in job manager 
[here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L376]
 which causes application to be treated as succeeded 
[here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L165]
 and successfully finish the job.

h2. Suggested Solution
- I propose to reimplement {{requestJobStatus}} and {{requestJobStore}} to fall 
back to Job store if Job was not found on job manager
as in 
{code}
 @Override
    public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time 
timeout) {
        Optional<JobManagerRunner> maybeJob = getJobManagerRunner(jobId);
        LOG.info("Is Jobmanager present for job {}? {}", jobId, 
maybeJob.isPresent());
        CompletableFuture<JobStatus> future =  maybeJob.map(job -> 
job.requestJobStatus(timeout))
                .orElseGet(
                        () -> {
                            // is it a completed job?
                            final JobDetails jobDetails =
                                    
executionGraphInfoStore.getAvailableJobDetails(jobId);
                            if (jobDetails == null) {
                                return FutureUtils.completedExceptionally(
                                        new FlinkJobNotFoundException(jobId));
                            } else {
                                return 
CompletableFuture.completedFuture(jobDetails.getStatus());
                            }
                        });

        return  FutureUtils.handleException(future, 
FlinkJobNotFoundException.class,
                (ignored) -> getJobStatusFromStore(jobId).join());
    }
{code}

{code}
 @Override
    public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time 
timeout) {
        if (!jobManagerRunnerRegistry.isRegistered(jobId)) {
            final ExecutionGraphInfo executionGraphInfo = 
executionGraphInfoStore.get(jobId);

            if (executionGraphInfo == null) {
                return getJobResultFromStore(jobId);
            } else {
                return CompletableFuture.completedFuture(
                        
JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
            }
        }

        final JobManagerRunner jobManagerRunner = 
jobManagerRunnerRegistry.get(jobId);
        return jobManagerRunner
                .getResultFuture()
                .thenApply(
                        jobManagerRunnerResult ->
                                JobResult.createFrom(
                                        jobManagerRunnerResult
                                                .getExecutionGraphInfo()
                                                .getArchivedExecutionGraph()));
    }
{code}

this will require exposing a new Api in job store to provide job results of 
Clean jobs with Ids
{code}


    /**
     * Get the persisted {@link JobResult} instance for the given {@code JobID}.
     *
     * @param jobId Ident of the job we wish to retrieve the result for.
     * @return The {@code JobResult} instance for the given {@code JobID}.
     * @throws IOException if retrieving the result failed for IO reasons.
     * @throws NoSuchElementException if there is no {@code JobResult} instance 
for the given {@code
     *     JobID}.
     */
    CompletableFuture<JobResult> getJobResultAsync(JobID jobId);
{code}





was (Author: JIRAUSER280246):
h2.Root Cause Analysis

1- Job manager receives Job request from {{ApplicationDispatcherBootstrap}}  
([Point of 
execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L232])
To {{Dispatcher}}  ([Point of 
Execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L518C12-L518C29])
- Logs 
{code}
2024-08-20 15:04:39,705 INFO  
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
[] - Running application entrypoint.
2024-08-20 15:02:58,284 INFO  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
Job 5d98a3e455408045defa96c6b1b03a5a is submitted.
2024-08-20 15:02:58,284 INFO  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
Submitting Job with JobId=5d98a3e455408045defa96c6b1b03a5a.
2024-08-20 15:02:58,289 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received 
JobGraph submission 'CarTopSpeedWindowingExample' 
(5d98a3e455408045defa96c6b1b03a5a).
2024-08-20 15:02:58,290 INFO  org.apache.flink.runtime.dispatcher.Dispatcher    
           [] - Received JobGraph submission 'CarTopSpeedWindowingExample' 
(5d98a3e455408045defa96c6b1b03a5a).
2024-08
{code}

2- Dispatcher verifies if job {code}isInGloballyTerminalState{code} ([Point Of 
execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L523])
 which in terms checks `jobResultStore` ([Point of 
execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L583])

3- JobResultStore Finds job result in store and returns true causing single 
execution mode to not submit the job to JobMaster throwing a 
{{DuplicateJobSubmissionException}} 
{code}
2024-08-20 15:02:58,333 WARN  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Ignoring 
JobGraph submission 'CarTopSpeedWindowingExample' 
(5d98a3e455408045defa96c6b1b03a5a) because the job already reached a 
globally-terminal sta
te (i.e. FAILED, CANCELED, FINISHED) in a previous execution.
{code} 


4- The Dispatcher then catches the {{DuplicateJobSubmissionException}} in 
[point of 
execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L323]
 and marks the job as completed with {{tolerateMissingResult}} as true

5- Dispatcher then returns completed job Id and tries to extract Application 
Status [point of 
execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L264]
 which polls Job status using {{JobStatusPollingUtils}} as in 
[here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L371]
 which delegates this to `DispatacherGateway` 
[here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JobStatusPollingUtils.java#L56]
 

6- The dispatcher tries to find job master for Job and get status from job 
master ignoring jobstore [point of 
execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L896]
 both for status and job result.

7- Since tolerate missing result is true we swallow the Job not found exception 
raised by dispatcher not finding job in job manager 
[here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L376]
 which causes application to be treated as succeeded 
[here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L165]
 and successfully finish the job.

h2. Suggested Solution
- I propose to reimplement {{requestJobStatus}} and {{requestJobStore}} to fall 
back to Job store if Job was not found on job manager
as in 
{{code}}
 @Override
    public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time 
timeout) {
        Optional<JobManagerRunner> maybeJob = getJobManagerRunner(jobId);
        LOG.info("Is Jobmanager present for job {}? {}", jobId, 
maybeJob.isPresent());
        CompletableFuture<JobStatus> future =  maybeJob.map(job -> 
job.requestJobStatus(timeout))
                .orElseGet(
                        () -> {
                            // is it a completed job?
                            final JobDetails jobDetails =
                                    
executionGraphInfoStore.getAvailableJobDetails(jobId);
                            if (jobDetails == null) {
                                return FutureUtils.completedExceptionally(
                                        new FlinkJobNotFoundException(jobId));
                            } else {
                                return 
CompletableFuture.completedFuture(jobDetails.getStatus());
                            }
                        });

        return  FutureUtils.handleException(future, 
FlinkJobNotFoundException.class,
                (ignored) -> getJobStatusFromStore(jobId).join());
    }
{code}
 @Override
    public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time 
timeout) {
        if (!jobManagerRunnerRegistry.isRegistered(jobId)) {
            final ExecutionGraphInfo executionGraphInfo = 
executionGraphInfoStore.get(jobId);

            if (executionGraphInfo == null) {
                return getJobResultFromStore(jobId);
            } else {
                return CompletableFuture.completedFuture(
                        
JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
            }
        }

        final JobManagerRunner jobManagerRunner = 
jobManagerRunnerRegistry.get(jobId);
        return jobManagerRunner
                .getResultFuture()
                .thenApply(
                        jobManagerRunnerResult ->
                                JobResult.createFrom(
                                        jobManagerRunnerResult
                                                .getExecutionGraphInfo()
                                                .getArchivedExecutionGraph()));
    }
{code}

this will require exposing a new Api in job store to provide job results of 
Clean jobs with Ids
{code}


    /**
     * Get the persisted {@link JobResult} instance for the given {@code JobID}.
     *
     * @param jobId Ident of the job we wish to retrieve the result for.
     * @return The {@code JobResult} instance for the given {@code JobID}.
     * @throws IOException if retrieving the result failed for IO reasons.
     * @throws NoSuchElementException if there is no {@code JobResult} instance 
for the given {@code
     *     JobID}.
     */
    CompletableFuture<JobResult> getJobResultAsync(JobID jobId);
{code}




> Duplicate Job Submission returns Succeeded on single execution even if Global 
> state is FAILED
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36010
>                 URL: https://issues.apache.org/jira/browse/FLINK-36010
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission
>    Affects Versions: 1.8.3, 1.17.2, 1.20.0, 1.19.1
>            Reporter: Ahmed Hamdy
>            Priority: Major
>             Fix For: 1.20.1
>
>
> h2. Description
> Running a job on single execution mode with HA enabled typically [short 
> circuits duplicate execution of job after it reaches globally terminal 
> state|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L316].
>  However this returns {{ApplicationStatus.SUCCEEDED}} even if initial global 
> job status is FAILED. This breaks the [consistency of 
> states.|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L48]
> h2. Reproducing steps
> 1- Create Flink configmap and service account as in 
> [https://github.com/vahmed-hamdy/flink-test-projects/tree/master/k8s/job-dedupe]
> 2- Submit Flink Job and TM deployments similar to 
> [https://github.com/vahmed-hamdy/flink-test-projects/blob/master/k8s/job-dedupe/FlinkApplicationJob.yml]
> 3- delete Taskmanager pods enforcing Job failover until restart count 
> consumed, The Job now reached Terminal state {{{}FAILED{}}}.
> 4- Batch Job is now restarted due to job manager failover, restarted but due 
> to single execution mode the job manager doesn't resubmit the job.
> 5- The job is recorded as {{SUCCEEDED}} due to detecting a globally terminal 
> state (FAILED).
> {code:java}
> Printing result to stdout. Use --output to specify output path.
> 2024-08-08 12:32:50,110 INFO  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Job 3d98a3e455408045defa90c6b0b03a5a is submitted.
> 2024-08-08 12:32:50,110 INFO  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Submitting Job with JobId=3d98a3e455408045defa90c6b0b03a5a.
> 2024-08-08 12:32:50,118 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received 
> JobGraph submission 'CarTopSpeedWindowingExample' 
> (3d98a3e455408045defa90c6b0b03a5a).
> 2024-08-08 12:32:50,164 WARN  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Ignoring 
> JobGraph submission 'CarTopSpeedWindowingExample' 
> (3d98a3e455408045defa90c6b0b03a5a) because the job already reached a 
> globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous 
> execution.
> 2024-08-08 12:32:50,171 INFO  
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
> [] - Application completed SUCCESSFULLY
> 2024-08-08 12:32:50,171 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting 
> StandaloneApplicationClusterEntryPoint down with application status 
> SUCCEEDED. Diagnostics null.
> {code}
> h3. Resource Highlights
> {code:java}
> flink-conf.yaml: |
> job-result-store.delete-on-commit: false
>     high-availability.type: ZOOKEEPER
>     high-availability.jobmanager.port: 6125
>     high-availability.storageDir: s3a://flink-high-availability-dir
>     high-availability.zookeeper.quorum: my-release-zookeeper:2181
>     high-availability.zookeeper.path.root: /flink
>     jobmanager.execution.slot-allocation-timeout: 10000
> {code}
> {code:java}
> spec:
>       restartPolicy: OnFailure
>       containers:
>         - name: jobmanager
>           image: apache/flink:1.20.0-scala_2.12
>           env:
>           - name: POD_IP
>             valueFrom:
>               fieldRef:
>                 apiVersion: v1
>                 fieldPath: status.podIP
>           args: ["standalone-job","--host", "$(POD_IP)",  "--job-classname", 
> "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing", 
> "--job-id", "3d98a3e455408045defa90c6b0b03a5a"]
> {code}
>  
> h2. Expected Outcome
> Instead of recording the Application state to SUCCEEDED on resubmission it 
> should be recorded as FAILED because the globally terminal state is FAILED
> h3. Appendix: JobResultStore on HA
> {code:java}
> {"result":{"id":"3d98a3e455408045defa90c6b0b03a5a","application-status":"FAILED","accumulator-results":{},"net-runtime":55991,"failure-cause":{"class":"org.apache.flink.runtime.JobException","stack-trace":"org.apache.flink.runtime.JobException:
>  Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, 
> backoffTimeMS=1).....{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to