patricklucas commented on code in PR #22987:
URL: https://github.com/apache/flink/pull/22987#discussion_r1275249702


##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java:
##########
@@ -207,6 +218,120 @@ public void testRestClientClosedHandling() throws 
Exception {
         }
     }
 
+    /**
+     * Tests that the futures returned by {@link RestClient} fail immediately 
if the client is
+     * already closed.
+     *
+     * <p>See FLINK-32583
+     */
+    @Test
+    public void testCloseClientBeforeRequest() throws Exception {
+        try (final RestClient restClient =
+                new RestClient(new Configuration(), 
Executors.directExecutor())) {
+            restClient.close(); // Intentionally close the client prior to the 
request
+
+            CompletableFuture<?> future =
+                    restClient.sendRequest(
+                            unroutableIp,
+                            80,
+                            new TestMessageHeaders(),
+                            EmptyMessageParameters.getInstance(),
+                            EmptyRequestBody.getInstance());
+
+            // Call get() on the future with a timeout of 0s so we can test 
that the exception
+            // thrown is not a TimeoutException, which is what would be thrown 
if restClient were
+            // not already closed
+            final ThrowingRunnable getFuture = () -> future.get(0, 
TimeUnit.SECONDS);
+
+            final Throwable cause = assertThrows(ExecutionException.class, 
getFuture).getCause();
+            assertThat(cause, instanceOf(IllegalStateException.class));
+            assertThat(cause.getMessage(), equalTo("RestClient is already 
closed"));
+        }
+    }
+
+    @Test
+    public void testCloseClientWhileProcessingRequest() throws Exception {
+        // Set up a Netty SelectStrategy with latches that allow us to step 
forward through Netty's
+        // request state machine, closing the client at a particular moment
+        final OneShotLatch connectTriggered = new OneShotLatch();
+        final OneShotLatch closeTriggered = new OneShotLatch();
+        final SelectStrategy fallbackSelectStrategy =
+                DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy();
+        final SelectStrategyFactory selectStrategyFactory =
+                () ->
+                        (selectSupplier, hasTasks) -> {
+                            connectTriggered.trigger();
+                            closeTriggered.awaitQuietly(1, TimeUnit.SECONDS);

Review Comment:
   Alright, no problem following that guidance—I actually hit it locally and it 
took a few minutes to track down this instance which should always resolve in 
milliseconds. I considered the risks but am happy to follow the convention.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java:
##########
@@ -207,6 +218,120 @@ public void testRestClientClosedHandling() throws 
Exception {
         }
     }
 
+    /**
+     * Tests that the futures returned by {@link RestClient} fail immediately 
if the client is
+     * already closed.
+     *
+     * <p>See FLINK-32583
+     */
+    @Test
+    public void testCloseClientBeforeRequest() throws Exception {
+        try (final RestClient restClient =
+                new RestClient(new Configuration(), 
Executors.directExecutor())) {
+            restClient.close(); // Intentionally close the client prior to the 
request
+
+            CompletableFuture<?> future =
+                    restClient.sendRequest(
+                            unroutableIp,
+                            80,
+                            new TestMessageHeaders(),
+                            EmptyMessageParameters.getInstance(),
+                            EmptyRequestBody.getInstance());
+
+            // Call get() on the future with a timeout of 0s so we can test 
that the exception
+            // thrown is not a TimeoutException, which is what would be thrown 
if restClient were
+            // not already closed
+            final ThrowingRunnable getFuture = () -> future.get(0, 
TimeUnit.SECONDS);
+
+            final Throwable cause = assertThrows(ExecutionException.class, 
getFuture).getCause();
+            assertThat(cause, instanceOf(IllegalStateException.class));
+            assertThat(cause.getMessage(), equalTo("RestClient is already 
closed"));
+        }
+    }
+
+    @Test
+    public void testCloseClientWhileProcessingRequest() throws Exception {
+        // Set up a Netty SelectStrategy with latches that allow us to step 
forward through Netty's
+        // request state machine, closing the client at a particular moment
+        final OneShotLatch connectTriggered = new OneShotLatch();
+        final OneShotLatch closeTriggered = new OneShotLatch();
+        final SelectStrategy fallbackSelectStrategy =
+                DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy();
+        final SelectStrategyFactory selectStrategyFactory =
+                () ->
+                        (selectSupplier, hasTasks) -> {
+                            connectTriggered.trigger();
+                            closeTriggered.awaitQuietly(1, TimeUnit.SECONDS);

Review Comment:
   Alright, no problem following that guidance—I actually hit it locally and it 
took a few minutes to track down this instance which should always resolve in 
milliseconds. I considered the risks but am happy to follow the convention.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to