XComp commented on code in PR #22987:
URL: https://github.com/apache/flink/pull/22987#discussion_r1271887156


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java:
##########
@@ -131,6 +133,11 @@ public class RestClient implements AutoCloseableAsync {
 
     private final String urlPrefix;
 
+    // Used to track unresolved request futures in case they need to be 
resolved when the client is
+    // closed
+    private final Map<CompletableFuture<Channel>, Void> responseChannelFutures 
=
+            Collections.synchronizedMap(new IdentityHashMap<>());

Review Comment:
   ```suggestion
       private final Collection<CompletableFuture<Channel>> 
responseChannelFutures =
               ConcurrentHashMap.newKeySet();
   ```
   nit: What about using `ConcurrentHashMap.newKeySet()` here? That simplifies 
the change a little bit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java:
##########
@@ -514,18 +533,25 @@ private static Request createRequest(
 
     private <P extends ResponseBody> CompletableFuture<P> submitRequest(
             String targetAddress, int targetPort, Request httpRequest, 
JavaType responseType) {
-        final ChannelFuture connectFuture = bootstrap.connect(targetAddress, 
targetPort);
-
         final CompletableFuture<Channel> channelFuture = new 
CompletableFuture<>();
+        responseChannelFutures.put(channelFuture, null);
 
-        connectFuture.addListener(
-                (ChannelFuture future) -> {
-                    if (future.isSuccess()) {
-                        channelFuture.complete(future.channel());
-                    } else {
-                        channelFuture.completeExceptionally(future.cause());
-                    }
-                });
+        if (isRunning.get()) {

Review Comment:
   ```suggestion
           if (!isRunning.get()) {
               return FutureUtils.completedExceptionally(new 
IOException("RestClient is closed"));
           }
   ```
   Inverting the condition here and moving this if block to the top of the 
method makes the diff smaller. And it makes the code also more readable due to 
less indentations, I feel.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java:
##########
@@ -289,6 +298,16 @@ private CompletableFuture<Void> shutdownInternally(Time 
timeout) {
         return terminationFuture;
     }
 
+    private void notifyResponseFuturesOfShutdown() {
+        responseChannelFutures
+                .keySet()
+                .forEach(
+                        future ->
+                                future.completeExceptionally(
+                                        new IOException("RestClient is 
closed")));

Review Comment:
   ```suggestion
                                           new IOException("RestClient was 
closed while processing the request.")));
   ```
   proposal in contrast to the other `IOException` error message.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java:
##########
@@ -207,6 +210,40 @@ public void testRestClientClosedHandling() throws 
Exception {
         }
     }
 
+    /**
+     * Tests that the futures returned by {@link RestClient} fail immediately 
if the client is
+     * already closed.
+     *
+     * <p>See FLINK-32583
+     */
+    @Test
+    public void testCloseClientBeforeRequest() throws Exception {

Review Comment:
   I looked into the multi-threading of netty a bit more and found the 
`SelectStrategy` a viable tool to control the state of the request. I came up 
with the following test:
   ```
   @Test
       public void testCloseClientWhileProcessingRequest() throws Exception {
           final OneShotLatch connectTriggered = new OneShotLatch();
           final OneShotLatch closeTriggered = new OneShotLatch();
           final SelectStrategy fallbackSelectStrategy =
                   DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy();
           final SelectStrategy selectStrategy =
                   (selectSupplier, hasTasks) -> {
                       connectTriggered.trigger();
                       closeTriggered.awaitQuietly();
   
                       return 
fallbackSelectStrategy.calculateStrategy(selectSupplier, hasTasks);
                   };
           // Note that the executor passed to the RestClient constructor is 
not the same as the
           // executor used by Netty
           try (final RestClient restClient =
                   new RestClient(
                           new Configuration(), Executors.directExecutor(), () 
-> selectStrategy)) {
               final CompletableFuture<?> requestFuture =
                       restClient.sendRequest(
                               unroutableIp,
                               80,
                               new TestMessageHeaders(),
                               EmptyMessageParameters.getInstance(),
                               EmptyRequestBody.getInstance());
   
               connectTriggered.await();
   
               final CompletableFuture<Void> closeFuture = 
restClient.closeAsync();
   
               closeTriggered.trigger();
   
               // close should complete successfully
               closeFuture.get();
   
               final Throwable cause =
                       assertThrows(
                                       ExecutionException.class,
                                       () -> requestFuture.get(0, 
TimeUnit.SECONDS))
                               .getCause();
               assertThat(cause, instanceOf(IllegalStateException.class));
           }
       }
   ```
   It would require an extension of the `RestClient` adding a package-private 
constructor:
   ```
       // ...
       public RestClient(Configuration configuration, Executor executor)
               throws ConfigurationException {
           this(configuration, executor, DefaultSelectStrategyFactory.INSTANCE);
       }
   
       @VisibleForTesting
       RestClient(
               Configuration configuration,
               Executor executor,
               SelectStrategyFactory selectStrategyFactory)
       //...
       NioEventLoopGroup group =
                   new NioEventLoopGroup(
                           1,
                           new ExecutorThreadFactory("flink-rest-client-netty"),
                           SelectorProvider.provider(),
                           selectStrategyFactory);
       // ...
   ```
   WDYT? That should test the `isRunning.get()` code path.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java:
##########
@@ -207,6 +210,40 @@ public void testRestClientClosedHandling() throws 
Exception {
         }
     }
 
+    /**
+     * Tests that the futures returned by {@link RestClient} fail immediately 
if the client is
+     * already closed.
+     *
+     * <p>See FLINK-32583
+     */
+    @Test
+    public void testCloseClientBeforeRequest() throws Exception {
+        // Note that the executor passed to the RestClient constructor is not 
the same as the
+        // executor used by Netty
+        try (final RestClient restClient =
+                new RestClient(new Configuration(), 
Executors.directExecutor())) {
+            // Intentionally close the client (and thus also the executor used 
by Netty)
+            restClient.close();
+
+            CompletableFuture<?> future =
+                    restClient.sendRequest(
+                            unroutableIp,
+                            80,
+                            new TestMessageHeaders(),
+                            EmptyMessageParameters.getInstance(),
+                            EmptyRequestBody.getInstance());
+
+            // Call get() on the future with a timeout of 0 so we can test 
that the exception thrown

Review Comment:
   ```suggestion
               // Call get() on the future with a timeout of 0s so we can test 
that the exception thrown
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java:
##########
@@ -514,18 +533,25 @@ private static Request createRequest(
 
     private <P extends ResponseBody> CompletableFuture<P> submitRequest(
             String targetAddress, int targetPort, Request httpRequest, 
JavaType responseType) {
-        final ChannelFuture connectFuture = bootstrap.connect(targetAddress, 
targetPort);
-
         final CompletableFuture<Channel> channelFuture = new 
CompletableFuture<>();
+        responseChannelFutures.put(channelFuture, null);
 
-        connectFuture.addListener(
-                (ChannelFuture future) -> {
-                    if (future.isSuccess()) {
-                        channelFuture.complete(future.channel());
-                    } else {
-                        channelFuture.completeExceptionally(future.cause());
-                    }
-                });
+        if (isRunning.get()) {
+            final ChannelFuture connectFuture = 
bootstrap.connect(targetAddress, targetPort);
+
+            connectFuture.addListener(
+                    (ChannelFuture future) -> {
+                        responseChannelFutures.remove(channelFuture);
+
+                        if (future.isSuccess()) {
+                            channelFuture.complete(future.channel());
+                        } else {
+                            
channelFuture.completeExceptionally(future.cause());
+                        }
+                    });
+        } else {
+            channelFuture.completeExceptionally(new IOException("RestClient is 
closed"));

Review Comment:
   ```suggestion
               channelFuture.completeExceptionally(new IOException("RestClient 
was closed while submitting the request."));
   ```
   Should we give more context to differentiate the two newly introduced 
`IOException`s?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java:
##########
@@ -207,6 +210,40 @@ public void testRestClientClosedHandling() throws 
Exception {
         }
     }
 
+    /**
+     * Tests that the futures returned by {@link RestClient} fail immediately 
if the client is
+     * already closed.
+     *
+     * <p>See FLINK-32583
+     */
+    @Test
+    public void testCloseClientBeforeRequest() throws Exception {

Review Comment:
   Btw. in that case, the future is failing with an `IllegalStateException`. 
I'm wondering whether we should switch to that one within the `RestClient` as 
well to make it consistent. Or we wrap the `IllegalStateException` in an 
`IOException` because I think that `IOException` still seems to be the best 
fit. :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to