Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4645#discussion_r139403702
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
    @@ -159,24 +155,17 @@ public void shutdown(Time timeout) {
        }
     
        private <P extends ResponseBody> CompletableFuture<P> 
submitRequest(String targetAddress, int targetPort, FullHttpRequest 
httpRequest, Class<P> responseClass) {
    -           return CompletableFuture.supplyAsync(() -> 
bootstrap.connect(targetAddress, targetPort), executor)
    -                   .thenApply((channel) -> {
    -                           try {
    -                                   return channel.sync();
    -                           } catch (InterruptedException e) {
    -                                   throw new FlinkRuntimeException(e);
    -                           }
    -                   })
    -                   .thenApply((ChannelFuture::channel))
    -                   .thenCompose(channel -> {
    -                           ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
    -                           CompletableFuture<JsonResponse> future = 
handler.getJsonFuture();
    -                           channel.writeAndFlush(httpRequest);
    -                           return future;
    -                   }).thenComposeAsync(
    -                           (JsonResponse rawResponse) -> 
parseResponse(rawResponse, responseClass),
    -                           executor
    -                   );
    +           ChannelFuture connect = bootstrap.connect(targetAddress, 
targetPort);
    +           Channel channel;
    --- End diff --
    
    That's true, generally I would also prefer making the entire method async. 
For now I would still like to make this change though, and happily revisit this 
later on when we have some more robust mechanism in place to deal with heavy 
loads.


---

Reply via email to