zentol closed pull request #6651: [1.5][FLINK-10115][rest] Ignore 
content-length limit for FileUploads 
URL: https://github.com/apache/flink/pull/6651
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index e8d93845107..0a0d833626e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -141,6 +141,7 @@ protected void respondAsLeader(ChannelHandlerContext ctx, 
Routed routed, T gatew
                                        hre);
                        }
 
+                       log.trace("Starting request processing.");
                        CompletableFuture<Void> requestProcessingFuture = 
respondToRequest(
                                ctx,
                                httpRequest,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index c58e86ed1b9..2fdaafaeee2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -21,6 +21,7 @@
 import org.apache.flink.runtime.rest.handler.FileUploads;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.util.FileUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -30,6 +31,7 @@
 import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultLastHttpContent;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
@@ -97,6 +99,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, 
final HttpObject ms
                                LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
                                if 
(httpRequest.getMethod().equals(HttpMethod.POST)) {
                                        if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
+                                               LOG.trace("Initializing 
multipart file upload.");
                                                
checkState(currentHttpPostRequestDecoder == null);
                                                checkState(currentHttpRequest 
== null);
                                                checkState(currentUploadDir == 
null);
@@ -110,6 +113,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
                                        
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
                                }
                        } else if (msg instanceof HttpContent && 
currentHttpPostRequestDecoder != null) {
+                               LOG.trace("Received http content.");
                                // make sure that we still have a upload dir in 
case that it got deleted in the meanwhile
                                RestServerEndpoint.createUploadDir(uploadDir, 
LOG);
 
@@ -124,9 +128,11 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
 
                                                final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
                                                
fileUpload.renameTo(dest.toFile());
+                                               LOG.trace("Upload of file {} 
complete.", fileUpload.getFilename());
                                        } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
                                                final Attribute request = 
(Attribute) data;
                                                // this could also be 
implemented by using the first found Attribute as the payload
+                                               LOG.trace("Upload of attribute 
{} complete.", request.getName());
                                                if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
                                                        currentJsonPayload = 
request.get();
                                                } else {
@@ -137,17 +143,23 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
                                }
 
                                if (httpContent instanceof LastHttpContent) {
+                                       LOG.trace("Finalizing multipart file 
upload.");
                                        
ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(currentUploadDir));
-                                       ctx.fireChannelRead(currentHttpRequest);
                                        if (currentJsonPayload != null) {
                                                // the following lines behave 
similar to httpContent#replace in netty 4.1
                                                // the only difference is that 
the validateHeaders flag isn't preserved
                                                // this shouldn't be a problem 
since we only copy existing headers
                                                DefaultLastHttpContent 
newContent = new 
DefaultLastHttpContent(Unpooled.wrappedBuffer(currentJsonPayload), false);
                                                
newContent.trailingHeaders().set(((LastHttpContent) 
httpContent).trailingHeaders());
+                                               
currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
currentJsonPayload.length);
+                                               
currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
RestConstants.REST_CONTENT_TYPE);
+                                               
ctx.fireChannelRead(currentHttpRequest);
                                                ctx.fireChannelRead(newContent);
                                        } else {
-                                               
ctx.fireChannelRead(ReferenceCountUtil.retain(httpContent));
+                                               
currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
+                                               
currentHttpRequest.headers().remove(HttpHeaders.Names.CONTENT_TYPE);
+                                               
ctx.fireChannelRead(currentHttpRequest);
+                                               
ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
                                        }
                                        reset();
                                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 0153d5dd31a..c350393371a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -102,6 +102,8 @@ public void before() throws Exception {
                Configuration config = new Configuration();
                config.setInteger(RestOptions.PORT, 0);
                config.setString(RestOptions.ADDRESS, "localhost");
+               // set this to a lower value on purpose to test that files 
larger than the content limit are still accepted
+               config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, 1024 * 
1024);
                configuredUploadDir = temporaryFolder.newFolder().toPath();
                config.setString(WebOptions.UPLOAD_DIR, 
configuredUploadDir.toString());
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to