patricklucas commented on code in PR #22987:
URL: https://github.com/apache/flink/pull/22987#discussion_r1264995814


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java:
##########
@@ -501,6 +501,22 @@ private <P extends ResponseBody> CompletableFuture<P> 
submitRequest(
                     }
                 });
 
+        // [FLINK-32583] If connectFuture failed instantly but channelFuture 
is unresolved, it may
+        // mean the executor service Netty is using has shut down, in which 
case the above listener
+        // to complete channelFuture will never run
+        if (connectFuture.isDone() && !connectFuture.isSuccess() && 
!channelFuture.isDone()) {

Review Comment:
   @XComp After manually trying out various orderings of the threaded 
operations, I've come to the conclusion that as long as `addListener` is called 
before the client is closed, the listener will be run. This means there is only 
a minuscule period between the non-blocking call to `bootstrap.connect()` and 
`connectFuture.addListener()` where this situation could arise.
   
   So again, I think the vast likelihood of hitting this issue is resolved with 
the simple `isRunning` check from the diff in my previous comment, while 100% 
coverage would require tracking the futures and resolving them on client 
shutdown, though with a higher chance of introducing another bug. Its behavior 
would also be difficult to test.
   
   Including the future tracking would look something like this:
   
   ```diff
   diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
   index 2055a2227cf..c986f7c6292 100644
   --- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
   +++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
   @@ -96,8 +96,10 @@ import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Collections;
    import java.util.Comparator;
   +import java.util.IdentityHashMap;
    import java.util.Iterator;
    import java.util.List;
   +import java.util.Map;
    import java.util.Optional;
    import java.util.ServiceLoader;
    import java.util.concurrent.CompletableFuture;
   @@ -127,6 +129,9 @@ public class RestClient implements AutoCloseableAsync {
    
        private final String urlPrefix;
    
   +    private final Map<CompletableFuture<Channel>, Void> 
responseChannelFutures =
   +            Collections.synchronizedMap(new IdentityHashMap<>());
   +
        @VisibleForTesting List<OutboundChannelHandlerFactory> 
outboundChannelHandlerFactories;
    
        public RestClient(Configuration configuration, Executor executor)
   @@ -252,6 +257,8 @@ public class RestClient implements AutoCloseableAsync {
                                .shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
                                .addListener(
                                        finished -> {
   +                                        notifyResponseFuturesOfShutdown();
   +
                                            if (finished.isSuccess()) {
                                                
terminationFuture.complete(null);
                                            } else {
   @@ -265,6 +272,16 @@ public class RestClient implements AutoCloseableAsync {
            return terminationFuture;
        }
    
   +    private void notifyResponseFuturesOfShutdown() {
   +        responseChannelFutures
   +                .keySet()
   +                .forEach(
   +                        future ->
   +                                future.completeExceptionally(
   +                                        new IOException("RestClient is 
closed")));
   +        responseChannelFutures.clear();
   +    }
   +
        public <
                        M extends MessageHeaders<EmptyRequestBody, P, 
EmptyMessageParameters>,
                        P extends ResponseBody>
   @@ -488,18 +505,25 @@ public class RestClient implements AutoCloseableAsync {
    
        private <P extends ResponseBody> CompletableFuture<P> submitRequest(
                String targetAddress, int targetPort, Request httpRequest, 
JavaType responseType) {
   -        final ChannelFuture connectFuture = 
bootstrap.connect(targetAddress, targetPort);
   -
            final CompletableFuture<Channel> channelFuture = new 
CompletableFuture<>();
   +        responseChannelFutures.put(channelFuture, null);
    
   -        connectFuture.addListener(
   -                (ChannelFuture future) -> {
   -                    if (future.isSuccess()) {
   -                        channelFuture.complete(future.channel());
   -                    } else {
   -                        channelFuture.completeExceptionally(future.cause());
   -                    }
   -                });
   +        if (isRunning.get()) {
   +            final ChannelFuture connectFuture = 
bootstrap.connect(targetAddress, targetPort);
   +
   +            connectFuture.addListener(
   +                    (ChannelFuture future) -> {
   +                        responseChannelFutures.remove(channelFuture);
   +
   +                        if (future.isSuccess()) {
   +                            channelFuture.complete(future.channel());
   +                        } else {
   +                            
channelFuture.completeExceptionally(future.cause());
   +                        }
   +                    });
   +        } else {
   +            channelFuture.completeExceptionally(new IOException("RestClient 
is closed"));
   +        }
    
            return channelFuture
                    .thenComposeAsync(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to