zentol closed pull request #6595: [FLINK-10115][rest] Ignore content-length 
limit for file uploads 
URL: https://github.com/apache/flink/pull/6595
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index 0d8605abc99..3d1ec9d0066 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -145,6 +145,7 @@ protected void respondAsLeader(ChannelHandlerContext ctx, 
RoutedRequest routedRe
                                        hre);
                        }
 
+                       log.trace("Starting request processing.");
                        CompletableFuture<Void> requestProcessingFuture = 
respondToRequest(
                                ctx,
                                httpRequest,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index d6287507697..7c46af04b55 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -21,6 +21,7 @@
 import org.apache.flink.runtime.rest.handler.FileUploads;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.util.FileUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -29,6 +30,7 @@
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
@@ -94,6 +96,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, 
final HttpObject ms
                                LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
                                if 
(httpRequest.getMethod().equals(HttpMethod.POST)) {
                                        if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
+                                               LOG.trace("Initializing 
multipart file upload.");
                                                
checkState(currentHttpPostRequestDecoder == null);
                                                checkState(currentHttpRequest 
== null);
                                                checkState(currentUploadDir == 
null);
@@ -107,6 +110,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
                                        
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
                                }
                        } else if (msg instanceof HttpContent && 
currentHttpPostRequestDecoder != null) {
+                               LOG.trace("Received http content.");
                                // make sure that we still have a upload dir in 
case that it got deleted in the meanwhile
                                RestServerEndpoint.createUploadDir(uploadDir, 
LOG);
 
@@ -121,9 +125,11 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
 
                                                final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
                                                
fileUpload.renameTo(dest.toFile());
+                                               LOG.trace("Upload of file {} 
complete.", fileUpload.getFilename());
                                        } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
                                                final Attribute request = 
(Attribute) data;
                                                // this could also be 
implemented by using the first found Attribute as the payload
+                                               LOG.trace("Upload of attribute 
{} complete.", request.getName());
                                                if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
                                                        currentJsonPayload = 
request.get();
                                                } else {
@@ -134,12 +140,18 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
                                }
 
                                if (httpContent instanceof LastHttpContent) {
+                                       LOG.trace("Finalizing multipart file 
upload.");
                                        
ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(currentUploadDir));
-                                       ctx.fireChannelRead(currentHttpRequest);
                                        if (currentJsonPayload != null) {
+                                               
currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
currentJsonPayload.length);
+                                               
currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
RestConstants.REST_CONTENT_TYPE);
+                                               
ctx.fireChannelRead(currentHttpRequest);
                                                
ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload)));
                                        } else {
-                                               
ctx.fireChannelRead(ReferenceCountUtil.retain(httpContent));
+                                               
currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
+                                               
currentHttpRequest.headers().remove(HttpHeaders.Names.CONTENT_TYPE);
+                                               
ctx.fireChannelRead(currentHttpRequest);
+                                               
ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
                                        }
                                        reset();
                                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 0153d5dd31a..c350393371a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -102,6 +102,8 @@ public void before() throws Exception {
                Configuration config = new Configuration();
                config.setInteger(RestOptions.PORT, 0);
                config.setString(RestOptions.ADDRESS, "localhost");
+               // set this to a lower value on purpose to test that files 
larger than the content limit are still accepted
+               config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, 1024 * 
1024);
                configuredUploadDir = temporaryFolder.newFolder().toPath();
                config.setString(WebOptions.UPLOAD_DIR, 
configuredUploadDir.toString());
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to