[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722998#comment-17722998 ]
Matthias Pohl edited comment on FLINK-32069 at 5/16/23 12:39 PM: ----------------------------------------------------------------- Ok, I was too lazy to switch to the 1.15 branch because I assumed that the code didn't change that much (and I wanted to avoid running into compilation issues again locally due to switching branches). But looks like the code did change. I'm going to do the code analysis once more for {{release-1.15.4}} (assuming that that's the version you're using). {{{}jobClient.getJobStatus{}}}: * Client side: ** *{{ClusterClientJobClientAdapter.getJobStatus}}* >>> *{{RestClusterClient.getJobStatus}}* >>> *{{RestClusterClient.requestJobStatus}}* >>> *{{RestClusterClient.getJobDetails}}* >>> *{{JobDetailsHeaders}} {{GET /jobs/:jobId}}* * Server side: ** {{JobDetailsHandler.handleRequest}} is called which relies on the {{ExecutionGraphCache}} instead of accessing the {{Dispatcher}} directly (like it's done in the {{master}} code with {{{}JobStatusHandler{}}}) For {{{}jobClient.getJobExecutionResult(){}}}, I find the following (same as in {{{}master{}}}): * Client side: ** *{{ClusterClientJobClientAdapter.getJobExecutionResult}}* >>> *{{RestClusterClient.requestJobResult}}* >>> *{{RestClusterClient.requestJobResultInternal}}* >>> *{{JobExecutionResultHeaders}} {{GET /jobs/:jobId/execution-result}}* * Server side: ** *{{JobExecutionResultHandler.handleRequest}}* >>> *{{Dispatcher.requestJobStatus}}* I'm missing the path, though, where we use {{{}requestMultipleJobDetails{}}}. Do I have the wrong version of Flink still? was (Author: mapohl): Ok, I was too lazy to switch to the 1.15 branch because I assumed that the code didn't change that much (and I wanted to avoid running into compilation issues again locally due to switching branches). But looks like the code did change. I'm going to do the code analysis once more for {{release-1.15.4}} (assuming that that's the version you're using). {{jobClient.getJobStatus}}: * Client side: ** {{ClusterClientJobClientAdapter.getJobStatus}} > {{RestClusterClient.getJobStatus}} > {{RestClusterClient.requestJobStatus}} > {{RestClusterClient.getJobDetails}} > {{JobDetailsHeaders}} {{GET /jobs/:jobId}} * Server side: ** {{JobDetailsHandler.handleRequest}} is called which relies on the {{ExecutionGraphCache}} instead of accessing the {{Dispatcher}} directly (like it's done in the {{master}} code with {{JobStatusHandler}}) For {{jobClient.getJobExecutionResult()}}, I find the following (same as in {{master}}): * Client side: ** {{ClusterClientJobClientAdapter.getJobExecutionResult}} > {{RestClusterClient.requestJobResult}} > {{RestClusterClient.requestJobResultInternal}} > {{JobExecutionResultHeaders}} {{GET /jobs/:jobId/execution-result}} * Server side: ** {{JobExecutionResultHandler.handleRequest}} > {{Dispatcher.requestJobStatus}} I'm missing the path, though, where we use {{requestMultipleJobDetails}}. Do I have the wrong version of Flink still? > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > -------------------------------------------------------------------------------- > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.16.1, 1.15.4 > Reporter: Aleksandr Iushmanov > Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race condition that is hard to reproduce on lightweight > setup. I was reproducing this running zeppelin notebook with remote flink > cluster and triggering SQL insert operation. If I find a smaller setup to > reproduce on small local cluster with lightweight client, I will update this > ticket when I have more input. I am open to suggestions on how to fix this. > > For Zeppelin I have a separate ticket because Flink 1.15 is not going to be > fixed but this issue if I understand it correctly should be common for all > versions starting 1.15, therefore it makes sense to address this starting > 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 > > [~mapohl], Thank you for assistance in slack, I have created this ticket to > back our conversation, could you please add your thoughts on this failure > mode? > > One possible solution would be to have additional check for presence of > JobResult in Result store before returning jobStatus (if there is a result, > job shouldn't be reported as running based on this documentation: > [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--]) -- This message was sent by Atlassian Jira (v8.20.10#820010)