zentol closed pull request #6650: [1.6][FLINK-10115][rest] Ignore content-length limit for FileUploads URL: https://github.com/apache/flink/pull/6650
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java index 0d8605abc99..3d1ec9d0066 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java @@ -145,6 +145,7 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe hre); } + log.trace("Starting request processing."); CompletableFuture<Void> requestProcessingFuture = respondToRequest( ctx, httpRequest, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java index d6287507697..7c46af04b55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java @@ -21,6 +21,7 @@ import org.apache.flink.runtime.rest.handler.FileUploads; import org.apache.flink.runtime.rest.handler.util.HandlerUtils; import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.util.RestConstants; import org.apache.flink.util.FileUtils; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; @@ -29,6 +30,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; @@ -94,6 +96,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod()); if (httpRequest.getMethod().equals(HttpMethod.POST)) { if (HttpPostRequestDecoder.isMultipart(httpRequest)) { + LOG.trace("Initializing multipart file upload."); checkState(currentHttpPostRequestDecoder == null); checkState(currentHttpRequest == null); checkState(currentUploadDir == null); @@ -107,6 +110,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms ctx.fireChannelRead(ReferenceCountUtil.retain(msg)); } } else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) { + LOG.trace("Received http content."); // make sure that we still have a upload dir in case that it got deleted in the meanwhile RestServerEndpoint.createUploadDir(uploadDir, LOG); @@ -121,9 +125,11 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); fileUpload.renameTo(dest.toFile()); + LOG.trace("Upload of file {} complete.", fileUpload.getFilename()); } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { final Attribute request = (Attribute) data; // this could also be implemented by using the first found Attribute as the payload + LOG.trace("Upload of attribute {} complete.", request.getName()); if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { currentJsonPayload = request.get(); } else { @@ -134,12 +140,18 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms } if (httpContent instanceof LastHttpContent) { + LOG.trace("Finalizing multipart file upload."); ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(currentUploadDir)); - ctx.fireChannelRead(currentHttpRequest); if (currentJsonPayload != null) { + currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, currentJsonPayload.length); + currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE); + ctx.fireChannelRead(currentHttpRequest); ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload))); } else { - ctx.fireChannelRead(ReferenceCountUtil.retain(httpContent)); + currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0); + currentHttpRequest.headers().remove(HttpHeaders.Names.CONTENT_TYPE); + ctx.fireChannelRead(currentHttpRequest); + ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT); } reset(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java index 0153d5dd31a..c350393371a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java @@ -102,6 +102,8 @@ public void before() throws Exception { Configuration config = new Configuration(); config.setInteger(RestOptions.PORT, 0); config.setString(RestOptions.ADDRESS, "localhost"); + // set this to a lower value on purpose to test that files larger than the content limit are still accepted + config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, 1024 * 1024); configuredUploadDir = temporaryFolder.newFolder().toPath(); config.setString(WebOptions.UPLOAD_DIR, configuredUploadDir.toString()); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services