Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196740018 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) { @Override protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception { - if (msg instanceof HttpRequest) { - final HttpRequest httpRequest = (HttpRequest) msg; - if (httpRequest.getMethod().equals(HttpMethod.POST)) { - if (HttpPostRequestDecoder.isMultipart(httpRequest)) { - currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); - currentHttpRequest = httpRequest; + try { + if (msg instanceof HttpRequest) { + final HttpRequest httpRequest = (HttpRequest) msg; + LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod()); + if (httpRequest.getMethod().equals(HttpMethod.POST)) { + if (HttpPostRequestDecoder.isMultipart(httpRequest)) { + currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); + currentHttpRequest = httpRequest; + currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString())); + } else { + ctx.fireChannelRead(msg); + } } else { ctx.fireChannelRead(msg); } + } else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) { + // make sure that we still have a upload dir in case that it got deleted in the meanwhile + RestServerEndpoint.createUploadDir(uploadDir, LOG); + + final HttpContent httpContent = (HttpContent) msg; + currentHttpPostRequestDecoder.offer(httpContent); + + while (currentHttpPostRequestDecoder.hasNext()) { + final InterfaceHttpData data = currentHttpPostRequestDecoder.next(); + if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) { + final DiskFileUpload fileUpload = (DiskFileUpload) data; + checkState(fileUpload.isCompleted()); + + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); + fileUpload.renameTo(dest.toFile()); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}.", data.getName()); + HandlerUtils.sendErrorResponse( + ctx, + currentHttpRequest, + new ErrorResponseBody("Received unknown attribute " + data.getName() + '.'), + HttpResponseStatus.BAD_REQUEST, + Collections.emptyMap() + ); + deleteUploadedFiles(); + reset(); + return; + } + } + } + + if (httpContent instanceof LastHttpContent) { + ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir))); + ctx.fireChannelRead(currentHttpRequest); + if (currentJsonPayload != null) { + ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload))); + } else { + ctx.fireChannelRead(httpContent); + } + reset(); + } } else { ctx.fireChannelRead(msg); } - } else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) { - // make sure that we still have a upload dir in case that it got deleted in the meanwhile - RestServerEndpoint.createUploadDir(uploadDir, LOG); - - final HttpContent httpContent = (HttpContent) msg; - currentHttpPostRequestDecoder.offer(httpContent); - - while (currentHttpPostRequestDecoder.hasNext()) { - final InterfaceHttpData data = currentHttpPostRequestDecoder.next(); - if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) { - final DiskFileUpload fileUpload = (DiskFileUpload) data; - checkState(fileUpload.isCompleted()); - - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); - fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); - } - } + } catch (Exception e) { + LOG.warn("Internal server error. File upload failed.", e); + HandlerUtils.sendErrorResponse( + ctx, + currentHttpRequest, + new ErrorResponseBody("File upload failed."), + HttpResponseStatus.INTERNAL_SERVER_ERROR, + Collections.emptyMap() + ); + deleteUploadedFiles(); + reset(); + } + } - if (httpContent instanceof LastHttpContent) { - ctx.fireChannelRead(currentHttpRequest); - ctx.fireChannelRead(httpContent); - reset(); + private void deleteUploadedFiles() { + if (currentUploadDir != null) { + try (FileUploads uploads = new FileUploads(Collections.singleton(currentUploadDir))) { --- End diff -- Why do we create a `FileUploads` instance instead of simply calling `FileUtils.deleteDirectory(currentUploadDir)`?
---