[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518058#comment-16518058 ]
ASF GitHub Bot commented on FLINK-9599: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196741255 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest extends TestLogger { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static MultipartJsonHandler jsonHandler; + private static MultipartFileHandler fileHandler; + private static File file1; + private static File file2; + + @BeforeClass + public static void setup() throws Exception { + Configuration config = new Configuration(); + config.setInteger(RestOptions.PORT, 0); + config.setString(RestOptions.ADDRESS, "localhost"); + config.setString(WebOptions.UPLOAD_DIR, TEMPORARY_FOLDER.newFolder().getCanonicalPath()); + + RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config); + + final String restAddress = "http://localhost:1234"; + RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder() + .setRestAddress(restAddress) + .build(); + + final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () -> + CompletableFuture.completedFuture(mockRestfulGateway); + + file1 = TEMPORARY_FOLDER.newFile(); + Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET)); + file2 = TEMPORARY_FOLDER.newFile(); + Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET)); + + mixedHandler = new MultipartMixedHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever); + jsonHandler = new MultipartJsonHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever); + fileHandler = new MultipartFileHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever); + + final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList( + Tuple2.of(mixedHandler.getMessageHeaders(), mixedHandler), + Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler), + Tuple2.of(fileHandler.getMessageHeaders(), fileHandler)); + + serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers); + + serverEndpoint.start(); + serverAddress = serverEndpoint.getRestBaseUrl(); + } + + @AfterClass + public static void teardown() throws Exception { + if (serverEndpoint != null) { + serverEndpoint.close(); + serverEndpoint = null; + } + } + + private static Request buildFileRequest(String headerUrl) { + MultipartBody.Builder builder = new MultipartBody.Builder(); + builder = addFilePart(builder); + return finalizeRequest(builder, headerUrl); + } + + private static Request buildJsonRequest(String headerUrl, int index) throws IOException { + MultipartBody.Builder builder = new MultipartBody.Builder(); + builder = addJsonPart(builder, index); + return finalizeRequest(builder, headerUrl); + } + + private static Request buildMixedRequest(String headerUrl, int index) throws IOException { + MultipartBody.Builder builder = new MultipartBody.Builder(); + builder = addJsonPart(builder, index); + builder = addFilePart(builder); + return finalizeRequest(builder, headerUrl); + } + + private static Request finalizeRequest(MultipartBody.Builder builder, String headerUrl) { + MultipartBody multipartBody = builder + .setType(MultipartBody.FORM) + .build(); + + return new Request.Builder() + .url(serverAddress + headerUrl) + .post(multipartBody) + .build(); + } + + private static MultipartBody.Builder addFilePart(MultipartBody.Builder builder) { + okhttp3.RequestBody filePayload1 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file1); + okhttp3.RequestBody filePayload2 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file2); + + return builder.addFormDataPart("file1", file1.getName(), filePayload1) + .addFormDataPart("file2", file2.getName(), filePayload2); + } + + private static MultipartBody.Builder addJsonPart(MultipartBody.Builder builder, int index) throws IOException { + TestRequestBody jsonRequestBody = new TestRequestBody(index); + + StringWriter sw = new StringWriter(); + OBJECT_MAPPER.writeValue(sw, jsonRequestBody); + + String jsonPayload = sw.toString(); + + return builder.addFormDataPart(org.apache.flink.runtime.rest.FileUploadHandler.HTTP_ATTRIBUTE_REQUEST, jsonPayload); + } + + @Test + public void testMixedMultipart() throws Exception { + OkHttpClient client = new OkHttpClient(); + + Request jsonRequest = buildJsonRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt()); + try (Response response = client.newCall(jsonRequest).execute()) { + // explicitly rejected by the test handler implementation + assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code()); + } + + Request fileRequest = buildFileRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL()); + try (Response response = client.newCall(fileRequest).execute()) { + // expected JSON payload is missing + assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code()); + } + + int mixedId = RANDOM.nextInt(); + Request mixedRequest = buildMixedRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), mixedId); + try (Response response = client.newCall(mixedRequest).execute()) { + assertEquals(mixedHandler.getMessageHeaders().getResponseStatusCode().code(), response.code()); + assertEquals(mixedId, mixedHandler.lastReceivedRequest.index); + } + } + + @Test + public void testJsonMultipart() throws Exception { + OkHttpClient client = new OkHttpClient(); + + int jsonId = RANDOM.nextInt(); + Request jsonRequest = buildJsonRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), jsonId); + try (Response response = client.newCall(jsonRequest).execute()) { + assertEquals(jsonHandler.getMessageHeaders().getResponseStatusCode().code(), response.code()); + assertEquals(jsonId, jsonHandler.lastReceivedRequest.index); + } + + Request fileRequest = buildFileRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL()); + try (Response response = client.newCall(fileRequest).execute()) { + // either because JSON payload is missing or FileUploads are outright forbidden + assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code()); + } + + Request mixedRequest = buildMixedRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt()); + try (Response response = client.newCall(mixedRequest).execute()) { + // FileUploads are outright forbidden + assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code()); + } + } + + @Test + public void testFileMultipart() throws Exception { + OkHttpClient client = new OkHttpClient(); + + Request jsonRequest = buildJsonRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt()); + try (Response response = client.newCall(jsonRequest).execute()) { + // JSON payload did not match expected format + assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code()); + } + + Request fileRequest = buildFileRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL()); + try (Response response = client.newCall(fileRequest).execute()) { + assertEquals(fileHandler.getMessageHeaders().getResponseStatusCode().code(), response.code()); + } + + Request mixedRequest = buildMixedRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt()); + try (Response response = client.newCall(mixedRequest).execute()) { + // JSON payload did not match expected format + assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code()); + } + } --- End diff -- We should also add a test which verifies that the upload directory gets cleaned up after the request has been processed. > Implement generic mechanism to receive files via rest > ----------------------------------------------------- > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)