[ 
https://issues.apache.org/jira/browse/FLINK-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093025#comment-17093025
 ] 

Yu Li commented on FLINK-15669:
-------------------------------

Since the fix is still good and only test reverted, I think it makes sense to 
still leave the fix version of this JIRA to 1.10.1. Maybe we could also close 
this one and open another JIRA to harden the test, what do you think? 
[~aljoscha] [~godfreyhe] Thanks.

> SQL client can't cancel flink job
> ---------------------------------
>
>                 Key: FLINK-15669
>                 URL: https://issues.apache.org/jira/browse/FLINK-15669
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Client
>    Affects Versions: 1.10.0
>            Reporter: godfrey he
>            Assignee: godfrey he
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.10.1, 1.11.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> in sql client, CLI client do cancel query operation through {{void 
> cancelQuery(String sessionId, String resultId)}} method in {{Executor}}. 
> However, the {{resultId}} is a random UUID, is not the job id. So CLI client 
> can't cancel a running job.
> related code in {{LocalExecutor}}:
> {code:java}
> private <C> ResultDescriptor executeQueryInternal(String sessionId, 
> ExecutionContext<C> context, String query) {
>        ......
>       // store the result with a unique id
>       final String resultId = UUID.randomUUID().toString();
>       resultStore.storeResult(resultId, result);
>       ......
>       // create execution
>       final ProgramDeployer deployer = new ProgramDeployer(
>               configuration, jobName, pipeline);
>       // start result retrieval
>       result.startRetrieval(deployer);
>       return new ResultDescriptor(
>                       resultId,
>                       removeTimeAttributes(table.getSchema()),
>                       result.isMaterialized());
> }
> private <T> void cancelQueryInternal(ExecutionContext<T> context, String 
> resultId) {
>       ......
>       // stop Flink job
>       try (final ClusterDescriptor<T> clusterDescriptor = 
> context.createClusterDescriptor()) {
>               ClusterClient<T> clusterClient = null;
>               try {
>                       // retrieve existing cluster
>                       clusterClient = 
> clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
>                       try {
>                               // ======== cancel job through resultId =======
>                               clusterClient.cancel(new 
> JobID(StringUtils.hexStringToByte(resultId))).get();
>                       } catch (Throwable t) {
>                               // the job might has finished earlier
>                       }
>               } catch (Exception e) {
>                       throw new SqlExecutionException("Could not retrieve or 
> create a cluster.", e);
>               } finally {
>                       try {
>                               if (clusterClient != null) {
>                                       clusterClient.close();
>                               }
>                       } catch (Exception e) {
>                               // ignore
>                       }
>               }
>       } catch (SqlExecutionException e) {
>               throw e;
>       } catch (Exception e) {
>               throw new SqlExecutionException("Could not locate a cluster.", 
> e);
>       }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to