patricklucas commented on code in PR #22987: URL: https://github.com/apache/flink/pull/22987#discussion_r1263539507
########## flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java: ########## @@ -501,6 +501,22 @@ private <P extends ResponseBody> CompletableFuture<P> submitRequest( } }); + // [FLINK-32583] If connectFuture failed instantly but channelFuture is unresolved, it may + // mean the executor service Netty is using has shut down, in which case the above listener + // to complete channelFuture will never run + if (connectFuture.isDone() && !connectFuture.isSuccess() && !channelFuture.isDone()) { Review Comment: The diff for RestClient if we just have it preemptively check `isRunning` (the test can remain the same): ```diff diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 2055a2227cf..e9f57c2f4ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -488,18 +488,22 @@ public class RestClient implements AutoCloseableAsync { private <P extends ResponseBody> CompletableFuture<P> submitRequest( String targetAddress, int targetPort, Request httpRequest, JavaType responseType) { - final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort); - final CompletableFuture<Channel> channelFuture = new CompletableFuture<>(); - connectFuture.addListener( - (ChannelFuture future) -> { - if (future.isSuccess()) { - channelFuture.complete(future.channel()); - } else { - channelFuture.completeExceptionally(future.cause()); - } - }); + if (isRunning.get()) { + final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort); + + connectFuture.addListener( + (ChannelFuture future) -> { + if (future.isSuccess()) { + channelFuture.complete(future.channel()); + } else { + channelFuture.completeExceptionally(future.cause()); + } + }); + } else { + channelFuture.completeExceptionally(new IOException("RestClient is closed")); + } return channelFuture .thenComposeAsync( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org