patricklucas commented on code in PR #22987: URL: https://github.com/apache/flink/pull/22987#discussion_r1275249702
########## flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java: ########## @@ -207,6 +218,120 @@ public void testRestClientClosedHandling() throws Exception { } } + /** + * Tests that the futures returned by {@link RestClient} fail immediately if the client is + * already closed. + * + * <p>See FLINK-32583 + */ + @Test + public void testCloseClientBeforeRequest() throws Exception { + try (final RestClient restClient = + new RestClient(new Configuration(), Executors.directExecutor())) { + restClient.close(); // Intentionally close the client prior to the request + + CompletableFuture<?> future = + restClient.sendRequest( + unroutableIp, + 80, + new TestMessageHeaders(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance()); + + // Call get() on the future with a timeout of 0s so we can test that the exception + // thrown is not a TimeoutException, which is what would be thrown if restClient were + // not already closed + final ThrowingRunnable getFuture = () -> future.get(0, TimeUnit.SECONDS); + + final Throwable cause = assertThrows(ExecutionException.class, getFuture).getCause(); + assertThat(cause, instanceOf(IllegalStateException.class)); + assertThat(cause.getMessage(), equalTo("RestClient is already closed")); + } + } + + @Test + public void testCloseClientWhileProcessingRequest() throws Exception { + // Set up a Netty SelectStrategy with latches that allow us to step forward through Netty's + // request state machine, closing the client at a particular moment + final OneShotLatch connectTriggered = new OneShotLatch(); + final OneShotLatch closeTriggered = new OneShotLatch(); + final SelectStrategy fallbackSelectStrategy = + DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy(); + final SelectStrategyFactory selectStrategyFactory = + () -> + (selectSupplier, hasTasks) -> { + connectTriggered.trigger(); + closeTriggered.awaitQuietly(1, TimeUnit.SECONDS); Review Comment: Alright, no problem following that guidance—I actually hit it locally and it took a few minutes to track down this instance which should always resolve in milliseconds. I considered the risks but am happy to follow the convention. ########## flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java: ########## @@ -207,6 +218,120 @@ public void testRestClientClosedHandling() throws Exception { } } + /** + * Tests that the futures returned by {@link RestClient} fail immediately if the client is + * already closed. + * + * <p>See FLINK-32583 + */ + @Test + public void testCloseClientBeforeRequest() throws Exception { + try (final RestClient restClient = + new RestClient(new Configuration(), Executors.directExecutor())) { + restClient.close(); // Intentionally close the client prior to the request + + CompletableFuture<?> future = + restClient.sendRequest( + unroutableIp, + 80, + new TestMessageHeaders(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance()); + + // Call get() on the future with a timeout of 0s so we can test that the exception + // thrown is not a TimeoutException, which is what would be thrown if restClient were + // not already closed + final ThrowingRunnable getFuture = () -> future.get(0, TimeUnit.SECONDS); + + final Throwable cause = assertThrows(ExecutionException.class, getFuture).getCause(); + assertThat(cause, instanceOf(IllegalStateException.class)); + assertThat(cause.getMessage(), equalTo("RestClient is already closed")); + } + } + + @Test + public void testCloseClientWhileProcessingRequest() throws Exception { + // Set up a Netty SelectStrategy with latches that allow us to step forward through Netty's + // request state machine, closing the client at a particular moment + final OneShotLatch connectTriggered = new OneShotLatch(); + final OneShotLatch closeTriggered = new OneShotLatch(); + final SelectStrategy fallbackSelectStrategy = + DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy(); + final SelectStrategyFactory selectStrategyFactory = + () -> + (selectSupplier, hasTasks) -> { + connectTriggered.trigger(); + closeTriggered.awaitQuietly(1, TimeUnit.SECONDS); Review Comment: Alright, no problem following that guidance—I actually hit it locally and it took a few minutes to track down this instance which should always resolve in milliseconds. I considered the risks but am happy to follow the convention. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org