Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4645#discussion_r139403702 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -159,24 +155,17 @@ public void shutdown(Time timeout) { } private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, Class<P> responseClass) { - return CompletableFuture.supplyAsync(() -> bootstrap.connect(targetAddress, targetPort), executor) - .thenApply((channel) -> { - try { - return channel.sync(); - } catch (InterruptedException e) { - throw new FlinkRuntimeException(e); - } - }) - .thenApply((ChannelFuture::channel)) - .thenCompose(channel -> { - ClientHandler handler = channel.pipeline().get(ClientHandler.class); - CompletableFuture<JsonResponse> future = handler.getJsonFuture(); - channel.writeAndFlush(httpRequest); - return future; - }).thenComposeAsync( - (JsonResponse rawResponse) -> parseResponse(rawResponse, responseClass), - executor - ); + ChannelFuture connect = bootstrap.connect(targetAddress, targetPort); + Channel channel; --- End diff -- That's true, generally I would also prefer making the entire method async. For now I would still like to make this change though, and happily revisit this later on when we have some more robust mechanism in place to deal with heavy loads.
---