[ 
https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16332724#comment-16332724
 ] 

ASF GitHub Bot commented on FLINK-8344:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5312#discussion_r162699002
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
    @@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() {
        public int getMaxSlots() {
                return 0;
        }
    +
    +   
//-------------------------------------------------------------------------
    +   // RestClient Helper
    +   
//-------------------------------------------------------------------------
    +
    +   private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends 
MessageParameters, P extends ResponseBody> CompletableFuture<P>
    +                   sendRequest(M messageHeaders, U messageParameters) 
throws IOException, LeaderNotAvailableException {
    +           return sendRequest(messageHeaders, messageParameters, 
EmptyRequestBody.getInstance());
    +   }
    +
    +   private <M extends MessageHeaders<R, P, EmptyMessageParameters>, R 
extends RequestBody, P extends ResponseBody> CompletableFuture<P>
    +                   sendRequest(M messageHeaders, R request) throws 
IOException, LeaderNotAvailableException {
    +           return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), request);
    +   }
    +
    +   private <M extends MessageHeaders<EmptyRequestBody, P, 
EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P>
    +                   sendRequest(M messageHeaders) throws IOException, 
LeaderNotAvailableException {
    +           return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
    +   }
    +
    +   private <M extends MessageHeaders<R, P, U>, U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture<P>
    +                   sendRequest(M messageHeaders, U messageParameters, R 
request) throws IOException, LeaderNotAvailableException {
    +           final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
    +           return restClient.sendRequest(restServerBaseUrl.getHost(), 
restServerBaseUrl.getPort(), messageHeaders, messageParameters, request);
    +   }
    +
    +   private <M extends MessageHeaders<R, P, U>, U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture<P>
    +                   sendRetryableRequest(M messageHeaders, U 
messageParameters, R request, Predicate<Throwable> retryPredicate) {
    +           return retry(() -> {
    +                   final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
    --- End diff --
    
    This is a blocking operation which can take potentially quite long. I think 
it would be better to return a future (by using `LeaderRetriever` for example) 
and register a `thenCompose` which executes the `sendRequest` call.


> Add support for HA to RestClusterClient
> ---------------------------------------
>
>                 Key: FLINK-8344
>                 URL: https://issues.apache.org/jira/browse/FLINK-8344
>             Project: Flink
>          Issue Type: Improvement
>          Components: Client
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Gary Yao
>            Priority: Major
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in 
> case of HA. We have to add functionality to reconnect to a newly elected 
> leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to