[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136772#comment-16136772 ]
ASF GitHub Bot commented on FLINK-7040: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134474895 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Super class for netty-based handlers that work with {@link RequestBody}s and {@link ResponseBody}s. + * + * <p>Subclasses must be thread-safe. + * + * @param <R> type of incoming requests + * @param <P> type of outgoing responses + */ +@ChannelHandler.Sharable +public abstract class AbstractRestHandler<R extends RequestBody, P extends ResponseBody> extends SimpleChannelInboundHandler<Routed> { + protected final Logger log = LoggerFactory.getLogger(getClass()); + + private static final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper(); + + private final MessageHeaders<R, P, ?> messageHeaders; + + protected AbstractRestHandler(MessageHeaders<R, P, ?> messageHeaders) { + this.messageHeaders = messageHeaders; + } + + public MessageHeaders<R, P, ?> getMessageHeaders() { + return messageHeaders; + } + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, Routed routed) throws Exception { + log.debug("Received request."); + final HttpRequest httpRequest = routed.request(); + + try { + if (!(httpRequest instanceof FullHttpRequest)) { + log.error("Implementation error: Received a request that wasn't a FullHttpResponse."); --- End diff -- Alright. Please add a comment explaining this. > Flip-6 client-cluster communication > ----------------------------------- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos > Reporter: Till Rohrmann > Assignee: Chesnay Schepler > Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)