[ 
https://issues.apache.org/jira/browse/FLINK-7582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169931#comment-16169931
 ] 

ASF GitHub Bot commented on FLINK-7582:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4645#discussion_r139403702
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
    @@ -159,24 +155,17 @@ public void shutdown(Time timeout) {
        }
     
        private <P extends ResponseBody> CompletableFuture<P> 
submitRequest(String targetAddress, int targetPort, FullHttpRequest 
httpRequest, Class<P> responseClass) {
    -           return CompletableFuture.supplyAsync(() -> 
bootstrap.connect(targetAddress, targetPort), executor)
    -                   .thenApply((channel) -> {
    -                           try {
    -                                   return channel.sync();
    -                           } catch (InterruptedException e) {
    -                                   throw new FlinkRuntimeException(e);
    -                           }
    -                   })
    -                   .thenApply((ChannelFuture::channel))
    -                   .thenCompose(channel -> {
    -                           ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
    -                           CompletableFuture<JsonResponse> future = 
handler.getJsonFuture();
    -                           channel.writeAndFlush(httpRequest);
    -                           return future;
    -                   }).thenComposeAsync(
    -                           (JsonResponse rawResponse) -> 
parseResponse(rawResponse, responseClass),
    -                           executor
    -                   );
    +           ChannelFuture connect = bootstrap.connect(targetAddress, 
targetPort);
    +           Channel channel;
    --- End diff --
    
    That's true, generally I would also prefer making the entire method async. 
For now I would still like to make this change though, and happily revisit this 
later on when we have some more robust mechanism in place to deal with heavy 
loads.


> Rest client may buffer responses indefinitely under heavy laod
> --------------------------------------------------------------
>
>                 Key: FLINK-7582
>                 URL: https://issues.apache.org/jira/browse/FLINK-7582
>             Project: Flink
>          Issue Type: Bug
>          Components: REST
>    Affects Versions: 1.4.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>             Fix For: 1.4.0
>
>
> The RestClient uses an executor for sending requests and parsing responses. 
> Under heavy load, i.e. lots of requests being sent, the executor may be used 
> exclusively for sending requests. The responses that are received by the 
> netty threads are thus never parsed and are buffered in memory, until either 
> requests stop coming in or all memory is used up.
> We should let the netty receiver thread do the parsing as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to