[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136751#comment-16136751 ]
ASF GitHub Bot commented on FLINK-7040: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134468988 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -155,38 +129,66 @@ public void shutdown() { .set(HttpHeaders.Names.HOST, configuredTargetAddress + ":" + configuredTargetPort) .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); - synchronized (this) { - // This ensures strict sequential processing of requests. - // If we send new requests immediately we can no longer make assumptions about the order in which responses - // arrive, due to which the handler cannot know which future he should complete (not to mention what response - // type to read). - CompletableFuture<P> nextFuture = lastFuture - .handleAsync((f, e) -> submitRequest(httpRequest, messageHeaders), directExecutor) - .thenCompose((future) -> future); - - lastFuture = nextFuture; - return nextFuture; - } + return submitRequest(httpRequest, messageHeaders); } private <M extends MessageHeaders<R, P, U>, U extends ParameterMapper, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> submitRequest(FullHttpRequest httpRequest, M messageHeaders) { - CompletableFuture<P> responseFuture = handler.expectResponse(messageHeaders.getResponseClass()); - - try { - // write request - Channel channel = bootstrap.connect(configuredTargetAddress, configuredTargetPort).sync().channel(); - channel.writeAndFlush(httpRequest); - channel.closeFuture(); - } catch (InterruptedException e) { - return FutureUtils.completedExceptionally(e); + synchronized (lock) { + CompletableFuture<P> responseFuture = ClientHandler.addHandlerForResponse(bootstrap, sslEngine, messageHeaders.getResponseClass()); --- End diff -- Can't we do it by having a fixed channel initializer which always creates a new `ClientHandler`? Then we can retrieve the actual `ClientHandler` by `channel.pipeline.get(ClientHandler.class)`. And then we can gain access to the future. > Flip-6 client-cluster communication > ----------------------------------- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos > Reporter: Till Rohrmann > Assignee: Chesnay Schepler > Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)