[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136890#comment-16136890 ]
ASF GitHub Bot commented on FLINK-7040: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134505729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -155,38 +129,66 @@ public void shutdown() { .set(HttpHeaders.Names.HOST, configuredTargetAddress + ":" + configuredTargetPort) .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); - synchronized (this) { - // This ensures strict sequential processing of requests. - // If we send new requests immediately we can no longer make assumptions about the order in which responses - // arrive, due to which the handler cannot know which future he should complete (not to mention what response - // type to read). - CompletableFuture<P> nextFuture = lastFuture - .handleAsync((f, e) -> submitRequest(httpRequest, messageHeaders), directExecutor) - .thenCompose((future) -> future); - - lastFuture = nextFuture; - return nextFuture; - } + return submitRequest(httpRequest, messageHeaders); } private <M extends MessageHeaders<R, P, U>, U extends ParameterMapper, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> submitRequest(FullHttpRequest httpRequest, M messageHeaders) { - CompletableFuture<P> responseFuture = handler.expectResponse(messageHeaders.getResponseClass()); - - try { - // write request - Channel channel = bootstrap.connect(configuredTargetAddress, configuredTargetPort).sync().channel(); - channel.writeAndFlush(httpRequest); - channel.closeFuture(); - } catch (InterruptedException e) { - return FutureUtils.completedExceptionally(e); + synchronized (lock) { + CompletableFuture<P> responseFuture = ClientHandler.addHandlerForResponse(bootstrap, sslEngine, messageHeaders.getResponseClass()); + + try { + // write request + Channel channel = bootstrap.connect(configuredTargetAddress, configuredTargetPort).sync().channel(); + channel.writeAndFlush(httpRequest); + channel.closeFuture(); + } catch (InterruptedException e) { + return FutureUtils.completedExceptionally(e); + } + return responseFuture; } - return responseFuture; } - @ChannelHandler.Sharable - private static class ClientHandler extends SimpleChannelInboundHandler<Object> { + private static class RestChannelInitializer extends ChannelInitializer<SocketChannel> { - private volatile ExpectedResponse<? extends ResponseBody> expectedResponse = null; + private final SSLEngine sslEngine; + private final ClientHandler handler; + + public RestChannelInitializer(SSLEngine sslEngine, ClientHandler handler) { + this.sslEngine = sslEngine; + this.handler = handler; + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + // SSL should be the first handler in the pipeline + if (sslEngine != null) { + ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); + } + + ch.pipeline() + .addLast(new HttpClientCodec()) + .addLast(new HttpObjectAggregator(1024 * 1024)) + .addLast(handler) + .addLast(new PipelineErrorHandler(LOG)); + } + } + + private static class ClientHandler<P extends ResponseBody> extends SimpleChannelInboundHandler<Object> { + + private final ExpectedResponse<P> expectedResponse; + + private ClientHandler(ExpectedResponse<P> expectedResponse) { + this.expectedResponse = expectedResponse; + } + + static <P extends ResponseBody> CompletableFuture<P> addHandlerForResponse(Bootstrap bootStrap, SSLEngine sslEngine, Class<P> expectedResponse) { + CompletableFuture<P> responseFuture = new CompletableFuture<>(); + + ClientHandler handler = new ClientHandler<>(new ExpectedResponse<>(expectedResponse, responseFuture)); + bootStrap.handler(new RestChannelInitializer(sslEngine, handler)); + + return responseFuture; + } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { --- End diff -- That's a good question. I don't the answers. > Flip-6 client-cluster communication > ----------------------------------- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos > Reporter: Till Rohrmann > Assignee: Chesnay Schepler > Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)