Github user tillrohrmann commented on a diff in the pull request: --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/ --- @@ -197,6 +208,88 @@ public void shutdown(Time timeout) { executor); } + public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest( + String targetAddress, + int targetPort, + M messageHeaders, + U messageParameters, + R request, + Collection<Path> jars, + Collection<Path> userArtifacts) throws IOException { + Preconditions.checkNotNull(targetAddress); + Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536]."); + Preconditions.checkNotNull(messageHeaders); + Preconditions.checkNotNull(request); + Preconditions.checkNotNull(messageParameters); + Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved."); + + String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters); + + LOG.debug("Sending request of class {} to {}:{}{}", request.getClass(), targetAddress, targetPort, targetUrl); + // serialize payload + StringWriter sw = new StringWriter(); + objectMapper.writeValue(sw, request); + ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)); + + // do not load file into memory, this can have weird side-effects and break functionality + HttpDataFactory factory = new DefaultHttpDataFactory(true); + + HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), messageHeaders.getTargetRestEndpointURL()); + httpRequest.headers() + .set(HttpHeaders.Names.HOST, targetAddress + ':' + targetPort) + .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); + + // takes care of splitting the request into multiple parts + HttpPostRequestEncoder bodyRequestEncoder; + try { + bodyRequestEncoder = new HttpPostRequestEncoder(factory, httpRequest, true); + + Attribute requestAttribute = new MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST); + requestAttribute.setContent(payload); + bodyRequestEncoder.addBodyHttpData(requestAttribute); + + addPathsToEncoder(jars, FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE, bodyRequestEncoder); + addPathsToEncoder(userArtifacts, FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE, bodyRequestEncoder); + + bodyRequestEncoder.finalizeRequest(); + } catch (HttpPostRequestEncoder.ErrorDataEncoderException e) { + return org.apache.flink.runtime.concurrent.FutureUtils.completedExceptionally(e); + } + + return createChannelFuture(targetAddress, targetPort) + .thenComposeAsync( + channel -> { + ClientHandler handler = channel.pipeline().get(ClientHandler.class); + CompletableFuture<JsonResponse> future = handler.getJsonFuture(); + + channel.writeAndFlush(httpRequest); + // if this is false the jars/artifacts are so small that they were already included in the initial request + if (bodyRequestEncoder.isChunked()) { + channel.writeAndFlush(bodyRequestEncoder); + } + + // release data and remove temporary files if they were created + bodyRequestEncoder.cleanFiles(); + + return future; + }, + executor) + .thenComposeAsync( + (JsonResponse rawResponse) -> parseResponse(rawResponse, objectMapper.constructType(messageHeaders.getResponseClass())), + executor); --- End diff -- It is not necessary to specify again `executor`. Calling `thenCompose` is enough.