Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6147#discussion_r195099673 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -91,19 +111,49 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) { final InterfaceHttpData data = currentHttpPostRequestDecoder.next(); - if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) { - final DiskFileUpload fileUpload = (DiskFileUpload) data; - checkState(fileUpload.isCompleted()); - - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); - fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); + if (currentHttpRequest.getUri().equals(JobSubmitHeaders.getInstance().getTargetRestEndpointURL())) { + if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) { + final DiskFileUpload fileUpload = (DiskFileUpload) data; + checkState(fileUpload.isCompleted()); + LOG.trace("Received job-submit file upload. attribute:{} fileName:{}.", fileUpload.getName(), fileUpload.getFilename()); + + Path dest; + if (data.getName().startsWith(HTTP_ATTRIBUTE_JARS)) { + dest = currentJobSubmitRequestBuffer.getJarDir().resolve(fileUpload.getFilename()); + fileUpload.renameTo(dest.toFile()); + currentJobSubmitRequestBuffer.addJar(fileUpload.getFile().toPath()); + } else if (data.getName().startsWith(HTTP_ATTRIBUTE_ARTIFACTS)) { + dest = currentJobSubmitRequestBuffer.getArtifactDir().resolve(fileUpload.getFilename()); + fileUpload.renameTo(dest.toFile()); + currentJobSubmitRequestBuffer.addUserArtifact(fileUpload.getFile().toPath()); + } else { + LOG.warn("Received unexpected FileUpload that will be ignored. attribute:{} fileName:{}.", data.getName(), fileUpload.getFilename()); + fileUpload.delete(); + } + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + final byte[] requestJson = request.get(); + JobSubmitRequestBody jobSubmitRequestBody = RestMapperUtils.getStrictObjectMapper().readValue(requestJson, JobSubmitHeaders.getInstance().getRequestClass()); + currentJobSubmitRequestBuffer.setJobGraph(jobSubmitRequestBody.serializedJobGraph); + } --- End diff -- I think we are mixing here a lot of handler specific knowledge into this handler and thereby creating a very strong coupling between multiple components. Moreover, this handler seems to deserialize json which is rather the responsibility of the `AbstractHandler`.
---