Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6147#discussion_r195101162 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -91,19 +111,49 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) { final InterfaceHttpData data = currentHttpPostRequestDecoder.next(); - if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) { - final DiskFileUpload fileUpload = (DiskFileUpload) data; - checkState(fileUpload.isCompleted()); - - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); - fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); + if (currentHttpRequest.getUri().equals(JobSubmitHeaders.getInstance().getTargetRestEndpointURL())) { --- End diff -- I think the `FileUploadHandler` should not know about the `JobSubmitHandler`. Instead it should only be responsible for receiving uploaded files, storing them in a temp directory and then making them accessible to a downstream handler (e.g. through an `Attribute` in the `AttributeMap`). In order to defer the deserialization of the Json part of the payload, we could create a new `HttpRequest` which contains exactly the data sent as a `MemoryAttribute` (the branch which matches `InterfaceHttpData.HttpDataType.Attribute`).
---