[ 
https://issues.apache.org/jira/browse/FLINK-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16631611#comment-16631611
 ] 

ASF GitHub Bot commented on FLINK-10415:
----------------------------------------

tillrohrmann closed pull request #6763: [FLINK-10415] Fail response future if 
connection closes in RestClient
URL: https://github.com/apache/flink/pull/6763
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/rest_configuration.html 
b/docs/_includes/generated/rest_configuration.html
index 1de41654eb7..25da9cfb067 100644
--- a/docs/_includes/generated/rest_configuration.html
+++ b/docs/_includes/generated/rest_configuration.html
@@ -32,6 +32,11 @@
             <td style="word-wrap: break-word;">15000</td>
             <td>The maximum time in ms for the client to establish a TCP 
connection.</td>
         </tr>
+        <tr>
+            <td><h5>rest.idleness-timeout</h5></td>
+            <td style="word-wrap: break-word;">300000</td>
+            <td>The maximum time in ms for a connection to stay idle before 
failing.</td>
+        </tr>
         <tr>
             <td><h5>rest.port</h5></td>
             <td style="word-wrap: break-word;">8081</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 3c2415828d6..c834483d7d0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -93,6 +93,14 @@
                        .defaultValue(15_000L)
                        .withDescription("The maximum time in ms for the client 
to establish a TCP connection.");
 
+       /**
+        * The maximum time in ms for a connection to stay idle before failing.
+        */
+       public static final ConfigOption<Long> IDLENESS_TIMEOUT =
+               key("rest.idleness-timeout")
+                       .defaultValue(5L * 60L * 1_000L) // 5 minutes
+                       .withDescription("The maximum time in ms for a 
connection to stay idle before failing.");
+
        /**
         * The maximum content length that the server will handle.
         */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
new file mode 100644
index 00000000000..339a5493fc3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+/**
+ * Exception which is thrown if the {@link RestClient} detects that a 
connection
+ * was closed.
+ */
+public class ConnectionClosedException extends ConnectionException {
+       private static final long serialVersionUID = 3802002501688542472L;
+
+       public ConnectionClosedException(String message) {
+               super(message);
+       }
+
+       public ConnectionClosedException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public ConnectionClosedException(Throwable cause) {
+               super(cause);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
new file mode 100644
index 00000000000..d92c643a0fc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import java.io.IOException;
+
+/**
+ * Base class for all connection related exception thrown by the
+ * {@link RestClient}.
+ */
+public class ConnectionException extends IOException {
+
+       private static final long serialVersionUID = -8483133957344173698L;
+
+       public ConnectionException(String message) {
+               super(message);
+       }
+
+       public ConnectionException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public ConnectionException(Throwable cause) {
+               super(cause);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
new file mode 100644
index 00000000000..96c335df53a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+/**
+ * Exception which is thrown by the {@link RestClient} if a connection
+ * becomes idle.
+ */
+public class ConnectionIdleException extends ConnectionException {
+
+       private static final long serialVersionUID = 5103778538635217293L;
+
+       public ConnectionIdleException(String message) {
+               super(message);
+       }
+
+       public ConnectionIdleException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public ConnectionIdleException(Throwable cause) {
+               super(cause);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index a8557496b08..d4c5e880bc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -33,7 +33,8 @@
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -71,6 +72,8 @@
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.MemoryAttribute;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateEvent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,6 +89,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
@@ -93,7 +97,7 @@
 /**
  * This client is the counter-part to the {@link RestServerEndpoint}.
  */
-public class RestClient {
+public class RestClient implements AutoCloseableAsync {
        private static final Logger LOG = 
LoggerFactory.getLogger(RestClient.class);
 
        private static final ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
@@ -103,24 +107,35 @@
 
        private final Bootstrap bootstrap;
 
+       private final CompletableFuture<Void> terminationFuture;
+
+       private final AtomicBoolean isRunning = new AtomicBoolean(true);
+
        public RestClient(RestClientConfiguration configuration, Executor 
executor) {
                Preconditions.checkNotNull(configuration);
                this.executor = Preconditions.checkNotNull(executor);
+               this.terminationFuture = new CompletableFuture<>();
 
                final SSLEngineFactory sslEngineFactory = 
configuration.getSslEngineFactory();
                ChannelInitializer<SocketChannel> initializer = new 
ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) 
{
-                               // SSL should be the first handler in the 
pipeline
-                               if (sslEngineFactory != null) {
-                                       socketChannel.pipeline().addLast("ssl", 
new SslHandler(sslEngineFactory.createSSLEngine()));
-                               }
+                               try {
+                                       // SSL should be the first handler in 
the pipeline
+                                       if (sslEngineFactory != null) {
+                                               
socketChannel.pipeline().addLast("ssl", new 
SslHandler(sslEngineFactory.createSSLEngine()));
+                                       }
 
-                               socketChannel.pipeline()
-                                       .addLast(new HttpClientCodec())
-                                       .addLast(new 
HttpObjectAggregator(configuration.getMaxContentLength()))
-                                       .addLast(new ChunkedWriteHandler()) // 
required for multipart-requests
-                                       .addLast(new ClientHandler());
+                                       socketChannel.pipeline()
+                                               .addLast(new HttpClientCodec())
+                                               .addLast(new 
HttpObjectAggregator(configuration.getMaxContentLength()))
+                                               .addLast(new 
ChunkedWriteHandler()) // required for multipart-requests
+                                               .addLast(new 
IdleStateHandler(configuration.getIdlenessTimeout(), 
configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), 
TimeUnit.MILLISECONDS))
+                                               .addLast(new ClientHandler());
+                               } catch (Throwable t) {
+                                       t.printStackTrace();
+                                       ExceptionUtils.rethrow(t);
+                               }
                        }
                };
                NioEventLoopGroup group = new NioEventLoopGroup(1, new 
ExecutorThreadFactory("flink-rest-client-netty"));
@@ -135,30 +150,42 @@ protected void initChannel(SocketChannel socketChannel) {
                LOG.info("Rest client endpoint started.");
        }
 
+       @Override
+       public CompletableFuture<Void> closeAsync() {
+               return shutdownInternally(Time.seconds(10L));
+       }
+
        public void shutdown(Time timeout) {
-               LOG.info("Shutting down rest endpoint.");
-               CompletableFuture<?> groupFuture = new CompletableFuture<>();
-               if (bootstrap != null) {
-                       if (bootstrap.group() != null) {
-                               bootstrap.group().shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
-                                       .addListener(finished -> {
-                                               if (finished.isSuccess()) {
-                                                       
groupFuture.complete(null);
-                                               } else {
-                                                       
groupFuture.completeExceptionally(finished.cause());
-                                               }
-                                       });
-                       }
-               }
+               final CompletableFuture<Void> shutDownFuture = 
shutdownInternally(timeout);
 
                try {
-                       groupFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                       shutDownFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
                        LOG.info("Rest endpoint shutdown complete.");
                } catch (Exception e) {
                        LOG.warn("Rest endpoint shutdown failed.", e);
                }
        }
 
+       private CompletableFuture<Void> shutdownInternally(Time timeout) {
+               if (isRunning.compareAndSet(true, false)) {
+                       LOG.info("Shutting down rest endpoint.");
+
+                       if (bootstrap != null) {
+                               if (bootstrap.group() != null) {
+                                       
bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS)
+                                               .addListener(finished -> {
+                                                       if 
(finished.isSuccess()) {
+                                                               
terminationFuture.complete(null);
+                                                       } else {
+                                                               
terminationFuture.completeExceptionally(finished.cause());
+                                                       }
+                                               });
+                               }
+                       }
+               }
+               return terminationFuture;
+       }
+
        public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
                        String targetAddress,
                        int targetPort,
@@ -311,12 +338,26 @@ private static Request createRequest(String 
targetAddress, String targetUrl, Htt
                        .thenComposeAsync(
                                channel -> {
                                        ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
-                                       CompletableFuture<JsonResponse> future 
= handler.getJsonFuture();
+
+                                       CompletableFuture<JsonResponse> future;
+                                       boolean success = false;
+
                                        try {
-                                               httpRequest.writeTo(channel);
+                                               if (handler == null) {
+                                                       throw new 
IOException("Netty pipeline was not properly initialized.");
+                                               } else {
+                                                       
httpRequest.writeTo(channel);
+                                                       future = 
handler.getJsonFuture();
+                                                       success = true;
+                                               }
                                        } catch (IOException e) {
-                                               return 
FutureUtils.completedExceptionally(new FlinkException("Could not write 
request.", e));
+                                               future = 
FutureUtils.completedExceptionally(new ConnectionException("Could not write 
request.", e));
+                                       } finally {
+                                               if (!success) {
+                                                       channel.close();
+                                               }
                                        }
+
                                        return future;
                                },
                                executor)
@@ -428,6 +469,22 @@ protected void channelRead0(ChannelHandlerContext ctx, 
Object msg) {
                        ctx.close();
                }
 
+               @Override
+               public void channelInactive(ChannelHandlerContext ctx) {
+                       jsonFuture.completeExceptionally(new 
ConnectionClosedException("Channel became inactive."));
+                       ctx.close();
+               }
+
+               @Override
+               public void userEventTriggered(ChannelHandlerContext ctx, 
Object evt) throws Exception {
+                       if (evt instanceof IdleStateEvent) {
+                               jsonFuture.completeExceptionally(new 
ConnectionIdleException("Channel became idle."));
+                               ctx.close();
+                       } else {
+                               super.userEventTriggered(ctx, evt);
+                       }
+               }
+
                @Override
                public void exceptionCaught(final ChannelHandlerContext ctx, 
final Throwable cause) {
                        if (cause instanceof TooLongFrameException) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
index e09f357ea74..b70b1f81ab3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -40,15 +40,19 @@
 
        private final long connectionTimeout;
 
+       private final long idlenessTimeout;
+
        private final int maxContentLength;
 
        private RestClientConfiguration(
                        @Nullable final SSLEngineFactory sslEngineFactory,
                        final long connectionTimeout,
+                       final long idlenessTimeout,
                        final int maxContentLength) {
                checkArgument(maxContentLength > 0, "maxContentLength must be 
positive, was: %d", maxContentLength);
                this.sslEngineFactory = sslEngineFactory;
                this.connectionTimeout = connectionTimeout;
+               this.idlenessTimeout = idlenessTimeout;
                this.maxContentLength = maxContentLength;
        }
 
@@ -63,12 +67,19 @@ public SSLEngineFactory getSslEngineFactory() {
        }
 
        /**
-        * @see RestOptions#CONNECTION_TIMEOUT
+        * {@see RestOptions#CONNECTION_TIMEOUT}.
         */
        public long getConnectionTimeout() {
                return connectionTimeout;
        }
 
+       /**
+        * {@see RestOptions#IDLENESS_TIMEOUT}.
+        */
+       public long getIdlenessTimeout() {
+               return idlenessTimeout;
+       }
+
        /**
         * Returns the max content length that the REST client endpoint could 
handle.
         *
@@ -102,8 +113,10 @@ public static RestClientConfiguration 
fromConfiguration(Configuration config) th
 
                final long connectionTimeout = 
config.getLong(RestOptions.CONNECTION_TIMEOUT);
 
+               final long idlenessTimeout = 
config.getLong(RestOptions.IDLENESS_TIMEOUT);
+
                int maxContentLength = 
config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);
 
-               return new RestClientConfiguration(sslEngineFactory, 
connectionTimeout, maxContentLength);
+               return new RestClientConfiguration(sslEngineFactory, 
connectionTimeout, idlenessTimeout, maxContentLength);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 22cd6f62063..958cca14dd9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -26,8 +26,10 @@
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.CheckedSupplier;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
@@ -35,10 +37,14 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
@@ -51,12 +57,13 @@
 
        private static final String unroutableIp = "10.255.255.1";
 
+       private static final long TIMEOUT = 10L;
+
        @Test
        public void testConnectionTimeout() throws Exception {
                final Configuration config = new Configuration();
                config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
-               final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(config), 
Executors.directExecutor());
-               try {
+               try (final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(config), 
Executors.directExecutor())) {
                        restClient.sendRequest(
                                unroutableIp,
                                80,
@@ -73,9 +80,7 @@ public void testConnectionTimeout() throws Exception {
 
        @Test
        public void testInvalidVersionRejection() throws Exception {
-               final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), 
Executors.directExecutor());
-
-               try {
+               try (final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), 
Executors.directExecutor())) {
                        CompletableFuture<EmptyResponseBody> 
invalidVersionResponse = restClient.sendRequest(
                                unroutableIp,
                                80,
@@ -89,7 +94,104 @@ public void testInvalidVersionRejection() throws Exception {
                } catch (IllegalArgumentException e) {
                        // expected
                }
+       }
+
+       /**
+        * Tests that we fail the operation if the remote connection closes.
+        */
+       @Test
+       public void testConnectionClosedHandling() throws Exception {
+               final Configuration config = new Configuration();
+               config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
+               try (final ServerSocket serverSocket = new ServerSocket(0);
+                       final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(config), 
TestingUtils.defaultExecutor())) {
+
+                       final String targetAddress = "localhost";
+                       final int targetPort = serverSocket.getLocalPort();
 
+                       // start server
+                       final CompletableFuture<Socket> socketCompletableFuture 
= 
CompletableFuture.supplyAsync(CheckedSupplier.unchecked(serverSocket::accept));
+
+                       final CompletableFuture<EmptyResponseBody> 
responseFuture = restClient.sendRequest(
+                               targetAddress,
+                               targetPort,
+                               new TestMessageHeaders(),
+                               EmptyMessageParameters.getInstance(),
+                               EmptyRequestBody.getInstance(),
+                               Collections.emptyList());
+
+                       Socket connectionSocket = null;
+
+                       try {
+                               connectionSocket = 
socketCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS);
+                       } catch (TimeoutException ignored) {
+                               // could not establish a server connection --> 
see that the response failed
+                               socketCompletableFuture.cancel(true);
+                       }
+
+                       if (connectionSocket != null) {
+                               // close connection
+                               connectionSocket.close();
+                       }
+
+                       try {
+                               responseFuture.get();
+                       } catch (ExecutionException ee) {
+                               if (!ExceptionUtils.findThrowable(ee, 
IOException.class).isPresent()) {
+                                       throw ee;
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Tests that we fail the operation if the client closes.
+        */
+       @Test
+       public void testRestClientClosedHandling() throws Exception {
+               final Configuration config = new Configuration();
+               config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
+
+               Socket connectionSocket = null;
+
+               try (final ServerSocket serverSocket = new ServerSocket(0);
+                       final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(config), 
TestingUtils.defaultExecutor())) {
+
+                       final String targetAddress = "localhost";
+                       final int targetPort = serverSocket.getLocalPort();
+
+                       // start server
+                       final CompletableFuture<Socket> socketCompletableFuture 
= 
CompletableFuture.supplyAsync(CheckedSupplier.unchecked(serverSocket::accept));
+
+                       final CompletableFuture<EmptyResponseBody> 
responseFuture = restClient.sendRequest(
+                               targetAddress,
+                               targetPort,
+                               new TestMessageHeaders(),
+                               EmptyMessageParameters.getInstance(),
+                               EmptyRequestBody.getInstance(),
+                               Collections.emptyList());
+
+                       try {
+                               connectionSocket = 
socketCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS);
+                       } catch (TimeoutException ignored) {
+                               // could not establish a server connection --> 
see that the response failed
+                               socketCompletableFuture.cancel(true);
+                       }
+
+                       restClient.close();
+
+                       try {
+                               responseFuture.get();
+                       } catch (ExecutionException ee) {
+                               if (!ExceptionUtils.findThrowable(ee, 
IOException.class).isPresent()) {
+                                       throw ee;
+                               }
+                       }
+               } finally {
+                       if (connectionSocket != null) {
+                               connectionSocket.close();
+                       }
+               }
        }
 
        private static class TestMessageHeaders implements 
MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestClient does not react to lost connection
> --------------------------------------------
>
>                 Key: FLINK-10415
>                 URL: https://issues.apache.org/jira/browse/FLINK-10415
>             Project: Flink
>          Issue Type: Bug
>          Components: REST
>    Affects Versions: 1.6.1, 1.7.0, 1.5.4
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> While working on FLINK-10403, I noticed that Flink's {{RestClient}} does not 
> seem to react to a lost connections in time. When sending a request to the 
> current leader it happened that the leader was killed just after establishing 
> the connection. Then the {{RestClient}} did not fail the connection and was 
> stuck in writing a request or retrieving a response from the lost leader. I'm 
> wondering whether we should introduce a {{ReadTimeoutHandler}} and 
> {{WriteTimeoutHandler}} to handle these problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to