[ https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722985#comment-17722985 ]
Matthias Pohl edited comment on FLINK-32069 at 5/16/23 12:37 PM: ----------------------------------------------------------------- For the client side, I'm not 100% sure whether your call hierarchy is correct. Based on the (master) code, I can find the following call hierarchies for the {{{}jobClient.getJobStatus(){}}}: * Client side: ** *{{ClusterClientJobClientAdapter.getJobStatus}}* >>> *{{RestClusterClient.getJobStatus}}* >>> *{{RestClusterClient.requestJobStatus}}* >>> *{{GET /jobs/:jobId/status}}* * Server side: ** *{{JobStatusHandler.handleRequest}}* >>> *{{Dispatcher.requestJobStatus}}* For the {{jobClient.getJobExecutionResult()}} call, I find the following call hierachy: * Client side: ** *{{ClusterClientJobClientAdapter.getJobExecutionResult}}* >>> *{{RestClusterClient.requestJobResult}}* >>> *{{RestClusterClient.requestJobResultInternal}}* >>> *{{GET /jobs/:jobId/execution-result}}* * Server side: ** *{{JobExecutionResultHandler.handleRequest}}* >>> *{{Dispatcher.requestJobStatus}}* Both call hierarchies end up in {{Dispatcher.requestJobStatus}} which look for the {{JobManagerRunner}} and use {{ExecutionGraphInfoStore}} as a fallback. Can you provide debug logs of the case? Additionally: Could you share what input (i.e. {{operations}} and {{{}tbenv{}}}) you use in your runs? was (Author: mapohl): For the client side, I'm not 100% sure whether your call hierarchy is correct. Based on the (master) code, I can find the following call hierarchies for the {{jobClient.getJobStatus()}}: * Client side: ** {{ClusterClientJobClientAdapter.getJobStatus}} >>> {{RestClusterClient.getJobStatus}} >>> {{RestClusterClient.requestJobStatus}} >>> {{GET /jobs/:jobId/status}} * Server side: ** {{JobStatusHandler.handleRequest}} >>> {{Dispatcher.requestJobStatus}} For the {{jobClient.getJobExecutionResult()}} call, I find the following call hierachy: * Client side: ** {{ClusterClientJobClientAdapter.getJobExecutionResult}} >>> {{RestClusterClient.requestJobResult}} >>> {{RestClusterClient.requestJobResultInternal}} >>> {{GET /jobs/:jobId/execution-result}} * Server side: ** {{JobExecutionResultHandler.handleRequest}} >>> {{Dispatcher.requestJobStatus}} Both call hierarchies end up in {{Dispatcher.requestJobStatus}} which look for the {{JobManagerRunner}} and use {{ExecutionGraphInfoStore}} as a fallback. Can you provide debug logs of the case? Additionally: Could you share what input (i.e. {{operations}} and {{tbenv}}) you use in your runs? > jobClient.getJobStatus() can return status RUNNING for finished insert > operation > -------------------------------------------------------------------------------- > > Key: FLINK-32069 > URL: https://issues.apache.org/jira/browse/FLINK-32069 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.16.1, 1.15.4 > Reporter: Aleksandr Iushmanov > Priority: Major > > Using zeppelin with remote cluster I came across some race condition issue > leading to failed expectations for SQL insert operations. > > Below is an example of zeppelin code that is failing because > jobClient.getJobStatus() returns running even after job has finished. I have > verified that same failover can happen if I use > jobClient.getJobExecutionResult().get() (Job execution result is: "Program > execution finished" but job status is not consistently finished) > {code:java} > TableResult tableResult = ((TableEnvironmentInternal) > tbenv).executeInternal(operations); > checkState(tableResult.getJobClient().isPresent()); > try { > tableResult.await(); > JobClient jobClient = tableResult.getJobClient().get(); > if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { > context.out.write("Insertion successfully.\n"); > } else { > throw new IOException("Job is failed, " + > jobClient.getJobExecutionResult().get().toString()); > } > } catch (InterruptedException e) { > throw new IOException("Flink job is interrupted", e); > } catch (ExecutionException e) { > throw new IOException("Flink job is failed", e); > } {code} > ZeppelinCode: > [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] > I suspect that job status is returned based on runningJobsRegistry and since > 1.15 this registry is not updated with FINISHED status prior to job result > future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} > [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] > > > It looks like as race condition that is hard to reproduce on lightweight > setup. I was reproducing this running zeppelin notebook with remote flink > cluster and triggering SQL insert operation. If I find a smaller setup to > reproduce on small local cluster with lightweight client, I will update this > ticket when I have more input. I am open to suggestions on how to fix this. > > For Zeppelin I have a separate ticket because Flink 1.15 is not going to be > fixed but this issue if I understand it correctly should be common for all > versions starting 1.15, therefore it makes sense to address this starting > 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 > > [~mapohl], Thank you for assistance in slack, I have created this ticket to > back our conversation, could you please add your thoughts on this failure > mode? > > One possible solution would be to have additional check for presence of > JobResult in Result store before returning jobStatus (if there is a result, > job shouldn't be reported as running based on this documentation: > [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--]) -- This message was sent by Atlassian Jira (v8.20.10#820010)