Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6147#discussion_r195101162
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
    @@ -91,19 +111,49 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
     
                        while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
                                final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
    -                           if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
    -                                   final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
    -                                   checkState(fileUpload.isCompleted());
    -
    -                                   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
    -                                           "_" + 
fileUpload.getFilename()));
    -                                   fileUpload.renameTo(dest.toFile());
    -                                   
ctx.channel().attr(UPLOADED_FILE).set(dest);
    +                           if 
(currentHttpRequest.getUri().equals(JobSubmitHeaders.getInstance().getTargetRestEndpointURL()))
 {
    --- End diff --
    
    I think the `FileUploadHandler` should not know about the 
`JobSubmitHandler`. Instead it should only be responsible for receiving 
uploaded files, storing them in a temp directory and then making them 
accessible to a downstream handler (e.g. through an `Attribute` in the 
`AttributeMap`). In order to defer the deserialization of the Json part of the 
payload, we could create a new `HttpRequest` which contains exactly the data 
sent as a `MemoryAttribute` (the branch which matches 
`InterfaceHttpData.HttpDataType.Attribute`).


---

Reply via email to