[ https://issues.apache.org/jira/browse/FLINK-36010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875237#comment-17875237 ]
Ahmed Hamdy edited comment on FLINK-36010 at 8/20/24 6:23 PM: -------------------------------------------------------------- h2. Root Cause Analysis 1- Job manager receives Job request from {{ApplicationDispatcherBootstrap}} ([Point of execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L232]) To {{Dispatcher}} ([Point of Execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L518C12-L518C29]) - Logs {code:java} 2024-08-20 15:04:39,705 INFO org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Running application entrypoint. 2024-08-20 15:02:58,284 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 5d98a3e455408045defa96c6b1b03a5a is submitted. 2024-08-20 15:02:58,284 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=5d98a3e455408045defa96c6b1b03a5a. 2024-08-20 15:02:58,289 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph submission 'CarTopSpeedWindowingExample' (5d98a3e455408045defa96c6b1b03a5a). 2024-08-20 15:02:58,290 INFO org.apache.flink.runtime.dispatcher.Dispatcher [] - Received JobGraph submission 'CarTopSpeedWindowingExample' (5d98a3e455408045defa96c6b1b03a5a). 2024-08 {code} 2- Dispatcher verifies if job {code:java} isInGloballyTerminalState{code} ([Point Of execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L523]) which in terms checks `jobResultStore` ([Point of execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L583]) 3- JobResultStore Finds job result in store and returns true causing single execution mode to not submit the job to JobMaster throwing a {{DuplicateJobSubmissionException}} {code:java} 2024-08-20 15:02:58,333 WARN org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring JobGraph submission 'CarTopSpeedWindowingExample' (5d98a3e455408045defa96c6b1b03a5a) because the job already reached a globally-terminal sta te (i.e. FAILED, CANCELED, FINISHED) in a previous execution. {code} 4- The Dispatcher then catches the {{DuplicateJobSubmissionException}} in [point of execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L323] and marks the job as completed with {{tolerateMissingResult}} as true 5- Dispatcher then returns completed job Id and tries to extract Application Status [point of execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L264] which polls Job status using {{JobStatusPollingUtils}} as in [here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L371] which delegates this to `DispatacherGateway` [here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JobStatusPollingUtils.java#L56] 6- The dispatcher tries to find job master for Job and get status from job master ignoring jobstore [point of execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L896] both for status and job result. 7- Since tolerate missing result is true we swallow the Job not found exception raised by dispatcher not finding job in job manager [here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L376] which causes application to be treated as succeeded [here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L165] and successfully finish the job. h2. Suggested Solution - I propose to reimplement {{requestJobStatus}} and {{requestJobStore}} to fall back to Job store if Job was not found on job manager as in {code:java} @Override public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout) { Optional<JobManagerRunner> maybeJob = getJobManagerRunner(jobId); LOG.info("Is Jobmanager present for job {}? {}", jobId, maybeJob.isPresent()); CompletableFuture<JobStatus> future = maybeJob.map(job -> job.requestJobStatus(timeout)) .orElseGet( () -> { // is it a completed job? final JobDetails jobDetails = executionGraphInfoStore.getAvailableJobDetails(jobId); if (jobDetails == null) { return FutureUtils.completedExceptionally( new FlinkJobNotFoundException(jobId)); } else { return CompletableFuture.completedFuture(jobDetails.getStatus()); } }); return FutureUtils.handleException(future, FlinkJobNotFoundException.class, (ignored) -> getJobStatusFromStore(jobId).join()); } {code} {code:java} @Override public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) { if (!jobManagerRunnerRegistry.isRegistered(jobId)) { final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId); if (executionGraphInfo == null) { return getJobResultFromStore(jobId); } else { return CompletableFuture.completedFuture( JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph())); } } final JobManagerRunner jobManagerRunner = jobManagerRunnerRegistry.get(jobId); return jobManagerRunner .getResultFuture() .thenApply( jobManagerRunnerResult -> JobResult.createFrom( jobManagerRunnerResult .getExecutionGraphInfo() .getArchivedExecutionGraph())); } {code} this will require exposing a new Api in job store to provide job results of Clean jobs with Ids {code:java} /** * Get the persisted {@link JobResult} instance for the given {@code JobID}. * * @param jobId Ident of the job we wish to retrieve the result for. * @return The {@code JobResult} instance for the given {@code JobID}. */ CompletableFuture<JobResult> getJobResultAsync(JobID jobId); {code} was (Author: JIRAUSER280246): h2.Root Cause Analysis 1- Job manager receives Job request from {{ApplicationDispatcherBootstrap}} ([Point of execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L232]) To {{Dispatcher}} ([Point of Execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L518C12-L518C29]) - Logs {code} 2024-08-20 15:04:39,705 INFO org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Running application entrypoint. 2024-08-20 15:02:58,284 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 5d98a3e455408045defa96c6b1b03a5a is submitted. 2024-08-20 15:02:58,284 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=5d98a3e455408045defa96c6b1b03a5a. 2024-08-20 15:02:58,289 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph submission 'CarTopSpeedWindowingExample' (5d98a3e455408045defa96c6b1b03a5a). 2024-08-20 15:02:58,290 INFO org.apache.flink.runtime.dispatcher.Dispatcher [] - Received JobGraph submission 'CarTopSpeedWindowingExample' (5d98a3e455408045defa96c6b1b03a5a). 2024-08 {code} 2- Dispatcher verifies if job {code}isInGloballyTerminalState{code} ([Point Of execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L523]) which in terms checks `jobResultStore` ([Point of execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L583]) 3- JobResultStore Finds job result in store and returns true causing single execution mode to not submit the job to JobMaster throwing a {{DuplicateJobSubmissionException}} {code} 2024-08-20 15:02:58,333 WARN org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring JobGraph submission 'CarTopSpeedWindowingExample' (5d98a3e455408045defa96c6b1b03a5a) because the job already reached a globally-terminal sta te (i.e. FAILED, CANCELED, FINISHED) in a previous execution. {code} 4- The Dispatcher then catches the {{DuplicateJobSubmissionException}} in [point of execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L323] and marks the job as completed with {{tolerateMissingResult}} as true 5- Dispatcher then returns completed job Id and tries to extract Application Status [point of execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L264] which polls Job status using {{JobStatusPollingUtils}} as in [here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L371] which delegates this to `DispatacherGateway` [here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JobStatusPollingUtils.java#L56] 6- The dispatcher tries to find job master for Job and get status from job master ignoring jobstore [point of execution|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L896] both for status and job result. 7- Since tolerate missing result is true we swallow the Job not found exception raised by dispatcher not finding job in job manager [here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L376] which causes application to be treated as succeeded [here|https://github.com/apache/flink/blob/36347e5731bd402a608026eb6dfad1c8ef07adbc/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L165] and successfully finish the job. h2. Suggested Solution - I propose to reimplement {{requestJobStatus}} and {{requestJobStore}} to fall back to Job store if Job was not found on job manager as in {code} @Override public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout) { Optional<JobManagerRunner> maybeJob = getJobManagerRunner(jobId); LOG.info("Is Jobmanager present for job {}? {}", jobId, maybeJob.isPresent()); CompletableFuture<JobStatus> future = maybeJob.map(job -> job.requestJobStatus(timeout)) .orElseGet( () -> { // is it a completed job? final JobDetails jobDetails = executionGraphInfoStore.getAvailableJobDetails(jobId); if (jobDetails == null) { return FutureUtils.completedExceptionally( new FlinkJobNotFoundException(jobId)); } else { return CompletableFuture.completedFuture(jobDetails.getStatus()); } }); return FutureUtils.handleException(future, FlinkJobNotFoundException.class, (ignored) -> getJobStatusFromStore(jobId).join()); } {code} {code} @Override public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) { if (!jobManagerRunnerRegistry.isRegistered(jobId)) { final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId); if (executionGraphInfo == null) { return getJobResultFromStore(jobId); } else { return CompletableFuture.completedFuture( JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph())); } } final JobManagerRunner jobManagerRunner = jobManagerRunnerRegistry.get(jobId); return jobManagerRunner .getResultFuture() .thenApply( jobManagerRunnerResult -> JobResult.createFrom( jobManagerRunnerResult .getExecutionGraphInfo() .getArchivedExecutionGraph())); } {code} this will require exposing a new Api in job store to provide job results of Clean jobs with Ids {code} /** * Get the persisted {@link JobResult} instance for the given {@code JobID}. * * @param jobId Ident of the job we wish to retrieve the result for. * @return The {@code JobResult} instance for the given {@code JobID}. * @throws IOException if retrieving the result failed for IO reasons. * @throws NoSuchElementException if there is no {@code JobResult} instance for the given {@code * JobID}. */ CompletableFuture<JobResult> getJobResultAsync(JobID jobId); {code} > Duplicate Job Submission returns Succeeded on single execution even if Global > state is FAILED > --------------------------------------------------------------------------------------------- > > Key: FLINK-36010 > URL: https://issues.apache.org/jira/browse/FLINK-36010 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission > Affects Versions: 1.8.3, 1.17.2, 1.20.0, 1.19.1 > Reporter: Ahmed Hamdy > Priority: Major > Fix For: 1.20.1 > > > h2. Description > Running a job on single execution mode with HA enabled typically [short > circuits duplicate execution of job after it reaches globally terminal > state|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L316]. > However this returns {{ApplicationStatus.SUCCEEDED}} even if initial global > job status is FAILED. This breaks the [consistency of > states.|https://github.com/apache/flink/blob/9457ae8c07b1a0dc036aa88735d36420009a7eb9/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L48] > h2. Reproducing steps > 1- Create Flink configmap and service account as in > [https://github.com/vahmed-hamdy/flink-test-projects/tree/master/k8s/job-dedupe] > 2- Submit Flink Job and TM deployments similar to > [https://github.com/vahmed-hamdy/flink-test-projects/blob/master/k8s/job-dedupe/FlinkApplicationJob.yml] > 3- delete Taskmanager pods enforcing Job failover until restart count > consumed, The Job now reached Terminal state {{{}FAILED{}}}. > 4- Batch Job is now restarted due to job manager failover, restarted but due > to single execution mode the job manager doesn't resubmit the job. > 5- The job is recorded as {{SUCCEEDED}} due to detecting a globally terminal > state (FAILED). > {code:java} > Printing result to stdout. Use --output to specify output path. > 2024-08-08 12:32:50,110 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Job 3d98a3e455408045defa90c6b0b03a5a is submitted. > 2024-08-08 12:32:50,110 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Submitting Job with JobId=3d98a3e455408045defa90c6b0b03a5a. > 2024-08-08 12:32:50,118 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received > JobGraph submission 'CarTopSpeedWindowingExample' > (3d98a3e455408045defa90c6b0b03a5a). > 2024-08-08 12:32:50,164 WARN > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring > JobGraph submission 'CarTopSpeedWindowingExample' > (3d98a3e455408045defa90c6b0b03a5a) because the job already reached a > globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous > execution. > 2024-08-08 12:32:50,171 INFO > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap > [] - Application completed SUCCESSFULLY > 2024-08-08 12:32:50,171 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting > StandaloneApplicationClusterEntryPoint down with application status > SUCCEEDED. Diagnostics null. > {code} > h3. Resource Highlights > {code:java} > flink-conf.yaml: | > job-result-store.delete-on-commit: false > high-availability.type: ZOOKEEPER > high-availability.jobmanager.port: 6125 > high-availability.storageDir: s3a://flink-high-availability-dir > high-availability.zookeeper.quorum: my-release-zookeeper:2181 > high-availability.zookeeper.path.root: /flink > jobmanager.execution.slot-allocation-timeout: 10000 > {code} > {code:java} > spec: > restartPolicy: OnFailure > containers: > - name: jobmanager > image: apache/flink:1.20.0-scala_2.12 > env: > - name: POD_IP > valueFrom: > fieldRef: > apiVersion: v1 > fieldPath: status.podIP > args: ["standalone-job","--host", "$(POD_IP)", "--job-classname", > "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing", > "--job-id", "3d98a3e455408045defa90c6b0b03a5a"] > {code} > > h2. Expected Outcome > Instead of recording the Application state to SUCCEEDED on resubmission it > should be recorded as FAILED because the globally terminal state is FAILED > h3. Appendix: JobResultStore on HA > {code:java} > {"result":{"id":"3d98a3e455408045defa90c6b0b03a5a","application-status":"FAILED","accumulator-results":{},"net-runtime":55991,"failure-cause":{"class":"org.apache.flink.runtime.JobException","stack-trace":"org.apache.flink.runtime.JobException: > Recovery is suppressed by > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, > backoffTimeMS=1).....{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)