XComp commented on code in PR #22987:
URL: https://github.com/apache/flink/pull/22987#discussion_r1272076683


##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java:
##########
@@ -207,6 +210,40 @@ public void testRestClientClosedHandling() throws 
Exception {
         }
     }
 
+    /**
+     * Tests that the futures returned by {@link RestClient} fail immediately 
if the client is
+     * already closed.
+     *
+     * <p>See FLINK-32583
+     */
+    @Test
+    public void testCloseClientBeforeRequest() throws Exception {

Review Comment:
   I looked into the multi-threading of netty a bit more and found the 
`SelectStrategy` being a viable tool to control the state of the request. I 
came up with the following test:
   ```
   @Test
       public void testCloseClientWhileProcessingRequest() throws Exception {
           final OneShotLatch connectTriggered = new OneShotLatch();
           final OneShotLatch closeTriggered = new OneShotLatch();
           final SelectStrategy fallbackSelectStrategy =
                   DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy();
           final SelectStrategy selectStrategy =
                   (selectSupplier, hasTasks) -> {
                       connectTriggered.trigger();
                       closeTriggered.awaitQuietly();
   
                       return 
fallbackSelectStrategy.calculateStrategy(selectSupplier, hasTasks);
                   };
           // Note that the executor passed to the RestClient constructor is 
not the same as the
           // executor used by Netty
           try (final RestClient restClient =
                   new RestClient(
                           new Configuration(), Executors.directExecutor(), () 
-> selectStrategy)) {
               final CompletableFuture<?> requestFuture =
                       restClient.sendRequest(
                               unroutableIp,
                               80,
                               new TestMessageHeaders(),
                               EmptyMessageParameters.getInstance(),
                               EmptyRequestBody.getInstance());
   
               connectTriggered.await();
   
               final CompletableFuture<Void> closeFuture = 
restClient.closeAsync();
   
               closeTriggered.trigger();
   
               // close should complete successfully
               closeFuture.get();
   
               final Throwable cause =
                       assertThrows(
                                       ExecutionException.class,
                                       () -> requestFuture.get(0, 
TimeUnit.SECONDS))
                               .getCause();
               assertThat(cause, instanceOf(IllegalStateException.class));
           }
       }
   ```
   It would require an extension of the `RestClient` adding a package-private 
constructor:
   ```
       // ...
       public RestClient(Configuration configuration, Executor executor)
               throws ConfigurationException {
           this(configuration, executor, DefaultSelectStrategyFactory.INSTANCE);
       }
   
       @VisibleForTesting
       RestClient(
               Configuration configuration,
               Executor executor,
               SelectStrategyFactory selectStrategyFactory)
       //...
       NioEventLoopGroup group =
                   new NioEventLoopGroup(
                           1,
                           new ExecutorThreadFactory("flink-rest-client-netty"),
                           SelectorProvider.provider(),
                           selectStrategyFactory);
       // ...
   ```
   WDYT? That should test the `isRunning.get()` code path.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java:
##########
@@ -207,6 +210,40 @@ public void testRestClientClosedHandling() throws 
Exception {
         }
     }
 
+    /**
+     * Tests that the futures returned by {@link RestClient} fail immediately 
if the client is
+     * already closed.
+     *
+     * <p>See FLINK-32583
+     */
+    @Test
+    public void testCloseClientBeforeRequest() throws Exception {

Review Comment:
   I looked into the multi-threading of netty a bit more and found the 
`SelectStrategy` being a viable tool to control the state of the request. I 
came up with the following test:
   ```
       @Test
       public void testCloseClientWhileProcessingRequest() throws Exception {
           final OneShotLatch connectTriggered = new OneShotLatch();
           final OneShotLatch closeTriggered = new OneShotLatch();
           final SelectStrategy fallbackSelectStrategy =
                   DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy();
           final SelectStrategy selectStrategy =
                   (selectSupplier, hasTasks) -> {
                       connectTriggered.trigger();
                       closeTriggered.awaitQuietly();
   
                       return 
fallbackSelectStrategy.calculateStrategy(selectSupplier, hasTasks);
                   };
           // Note that the executor passed to the RestClient constructor is 
not the same as the
           // executor used by Netty
           try (final RestClient restClient =
                   new RestClient(
                           new Configuration(), Executors.directExecutor(), () 
-> selectStrategy)) {
               final CompletableFuture<?> requestFuture =
                       restClient.sendRequest(
                               unroutableIp,
                               80,
                               new TestMessageHeaders(),
                               EmptyMessageParameters.getInstance(),
                               EmptyRequestBody.getInstance());
   
               connectTriggered.await();
   
               final CompletableFuture<Void> closeFuture = 
restClient.closeAsync();
   
               closeTriggered.trigger();
   
               // close should complete successfully
               closeFuture.get();
   
               final Throwable cause =
                       assertThrows(
                                       ExecutionException.class,
                                       () -> requestFuture.get(0, 
TimeUnit.SECONDS))
                               .getCause();
               assertThat(cause, instanceOf(IllegalStateException.class));
           }
       }
   ```
   It would require an extension of the `RestClient` adding a package-private 
constructor:
   ```
       // ...
       public RestClient(Configuration configuration, Executor executor)
               throws ConfigurationException {
           this(configuration, executor, DefaultSelectStrategyFactory.INSTANCE);
       }
   
       @VisibleForTesting
       RestClient(
               Configuration configuration,
               Executor executor,
               SelectStrategyFactory selectStrategyFactory)
       //...
       NioEventLoopGroup group =
                   new NioEventLoopGroup(
                           1,
                           new ExecutorThreadFactory("flink-rest-client-netty"),
                           SelectorProvider.provider(),
                           selectStrategyFactory);
       // ...
   ```
   WDYT? That should test the `isRunning.get()` code path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to