Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6147#discussion_r195098933 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -197,6 +208,88 @@ public void shutdown(Time timeout) { executor); } + public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest( + String targetAddress, + int targetPort, + M messageHeaders, + U messageParameters, + R request, + Collection<Path> jars, + Collection<Path> userArtifacts) throws IOException { + Preconditions.checkNotNull(targetAddress); + Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536]."); + Preconditions.checkNotNull(messageHeaders); + Preconditions.checkNotNull(request); + Preconditions.checkNotNull(messageParameters); + Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved."); + + String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters); + + LOG.debug("Sending request of class {} to {}:{}{}", request.getClass(), targetAddress, targetPort, targetUrl); + // serialize payload + StringWriter sw = new StringWriter(); + objectMapper.writeValue(sw, request); + ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)); + + // do not load file into memory, this can have weird side-effects and break functionality + HttpDataFactory factory = new DefaultHttpDataFactory(true); + + HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), messageHeaders.getTargetRestEndpointURL()); + httpRequest.headers() + .set(HttpHeaders.Names.HOST, targetAddress + ':' + targetPort) + .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); + + // takes care of splitting the request into multiple parts + HttpPostRequestEncoder bodyRequestEncoder; + try { + bodyRequestEncoder = new HttpPostRequestEncoder(factory, httpRequest, true); + + Attribute requestAttribute = new MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST); + requestAttribute.setContent(payload); + bodyRequestEncoder.addBodyHttpData(requestAttribute); + + addPathsToEncoder(jars, FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE, bodyRequestEncoder); + addPathsToEncoder(userArtifacts, FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE, bodyRequestEncoder); + + bodyRequestEncoder.finalizeRequest(); --- End diff -- I think we can _always_ send the request that the encoder returns.
---