[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722691#comment-17722691 ]
Matthias Pohl edited comment on FLINK-32069 at 5/15/23 2:42 PM: ---------------------------------------------------------------- Thanks for documenting this issue in Jira, [~izeren]. Here are my findings so far: I struggle to find a connection between the {{RunningJobsRegistry}} and the {{getJobStatus}} call of the client (which calls {{Dispatcher.requestJobStatus}} in the end. [~izeren] is right with claiming that we did a slight modification of the code when removing the {{RunningJobRegistry}} in [JobMasterServiceLeadershipRunner:385ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL385-L395]. Marking this job as done happened before completing the {{JobMasterServiceLeadershipRunner#resultFuture}} through the {{{}RunningJobsRegistry{}}}. In the current code, we mark the job as completed after completing {{JobMasterServiceLeadershipRunner#resultFuture}} through the {{{}JobResultStore{}}}. My issue is, though, that we're not relying on the {{RunningJobsRegistry}} in any way for the {{Dispatcher#requestJob}} call. The {{RunningJobsRegistry}} was only used for leader recovery in [JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess:272ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL272-L278] and when submitting a job through [Dispatcher#isInGloballyTerminalState:375ff|https://github.com/apache/flink/pull/18189/files#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cL375] in {{{}Dispatcher#submitJob{}}}. I would have expected that we would find {{Dispatcher#requestJobStatus}} somewhere in the call hierarchy of {{{}RunningJobsRegistry#getJobSchedulingStatus{}}}, if it would have had an influence on {{{}Dispatcher#requestJobStatus{}}}. I don't want to say that [~izeren]'s conclusion is wrong, yet. It just doesn't match my findings in the code. It could be also that I'm missing a code path here. [~dmvk], do you have something to add? was (Author: mapohl): Thanks for documenting this issue in Jira, [~izeren]. Here are my findings so far: I struggle to find a connection between the {{RunningJobRegistry}} and the {{getJobStatus}} call of the client (which calls {{Dispatcher.requestJobStatus}} in the end. [~izeren] is right with claiming that we did a slight modification of the code when removing the {{RunningJobRegistry}} in [JobMasterServiceLeadershipRunner:385ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL385-L395]. Marking this job as done happened before completing the {{JobMasterServiceLeadershipRunner#resultFuture}} through the {{{}RunningJobsRegistry{}}}. In the current code, we mark the job as completed after completing {{JobMasterServiceLeadershipRunner#resultFuture}} through the {{{}JobResultStore{}}}. My issue is, though, that we're not relying on the {{RunningJobsRegistry}} in any way for the {{Dispatcher#requestJob}} call. The {{RunningJobsRegistry}} was only used for leader recovery in [JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess:272ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL272-L278] and when submitting a job through [Dispatcher#isInGloballyTerminalState:375ff|https://github.com/apache/flink/pull/18189/files#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cL375] in {{{}Dispatcher#submitJob{}}}. I would have expected that we would find {{Dispatcher#requestJobStatus}} somewhere in the call hierarchy of {{{}RunningJobsRegistry#getJobSchedulingStatus{}}}, if it would have had an influence on {{{}Dispatcher#requestJobStatus{}}}. I don't want to say that [~izeren]'s conclusion is wrong, yet. It just doesn't match my findings in the code. It could be also that I'm missing a code path here. [~dmvk], do you have something to add? > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > -------------------------------------------------------------------------------- > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.16.1, 1.15.4 > Reporter: Aleksandr Iushmanov > Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race condition that is hard to reproduce on lightweight > setup. I was reproducing this running zeppelin notebook with remote flink > cluster and triggering SQL insert operation. If I find a smaller setup to > reproduce on small local cluster with lightweight client, I will update this > ticket when I have more input. I am open to suggestions on how to fix this. > > For Zeppelin I have a separate ticket because Flink 1.15 is not going to be > fixed but this issue if I understand it correctly should be common for all > versions starting 1.15, therefore it makes sense to address this starting > 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 > > [~mapohl], Thank you for assistance in slack, I have created this ticket to > back our conversation, could you please add your thoughts on this failure > mode? > > One possible solution would be to have additional check for presence of > JobResult in Result store before returning jobStatus (if there is a result, > job shouldn't be reported as running based on this documentation: > [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--]) -- This message was sent by Atlassian Jira (v8.20.10#820010)