xintongsong commented on a change in pull request #15411: URL: https://github.com/apache/flink/pull/15411#discussion_r604545334
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java ########## @@ -262,4 +267,136 @@ protected ExtendedEndpoint( return CompletableFuture.completedFuture(isRunning()); } } + + /** + * Tests that the {@link RpcEndpoint} can execute the runnable. See also {@link + * org.apache.flink.runtime.rpc.RpcEndpoint#runAsync(Runnable)} + */ + @Test + public void testRunAsync() throws InterruptedException, ExecutionException, TimeoutException { + RpcEndpoint dummyRpcEndpoint = new RpcEndpoint(rpcService) {}; Review comment: It is preferred to reuse `BaseEndpoint` rather than an anonymous class. Since `foobarValue` is not needed in this test case, we may consider adding a non-argument constructor for `BaseEndpoint`. Same for other tests. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java ########## @@ -262,4 +267,136 @@ protected ExtendedEndpoint( return CompletableFuture.completedFuture(isRunning()); } } + + /** + * Tests that the {@link RpcEndpoint} can execute the runnable. See also {@link + * org.apache.flink.runtime.rpc.RpcEndpoint#runAsync(Runnable)} + */ + @Test + public void testRunAsync() throws InterruptedException, ExecutionException, TimeoutException { + RpcEndpoint dummyRpcEndpoint = new RpcEndpoint(rpcService) {}; + final OneShotLatch latch = new OneShotLatch(); + try { + dummyRpcEndpoint.start(); + dummyRpcEndpoint.runAsync(latch::trigger); Review comment: One of the most important contract of the scheduling methods is that, the scheduled command must be executed in the RPC main thread. I think we should also very that in the test. Same for other tests. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java ########## @@ -262,4 +267,136 @@ protected ExtendedEndpoint( return CompletableFuture.completedFuture(isRunning()); } } + + /** + * Tests that the {@link RpcEndpoint} can execute the runnable. See also {@link + * org.apache.flink.runtime.rpc.RpcEndpoint#runAsync(Runnable)} + */ + @Test + public void testRunAsync() throws InterruptedException, ExecutionException, TimeoutException { + RpcEndpoint dummyRpcEndpoint = new RpcEndpoint(rpcService) {}; + final OneShotLatch latch = new OneShotLatch(); + try { + dummyRpcEndpoint.start(); + dummyRpcEndpoint.runAsync(latch::trigger); + latch.await(TIMEOUT.getSize(), TIMEOUT.getUnit()); + assertTrue(latch.isTriggered()); + } finally { + RpcUtils.terminateRpcEndpoint(dummyRpcEndpoint, TIMEOUT); + } + } + + /** + * Tests that the {@link RpcEndpoint} can execute the runnable with a delay specified in Time. + * See also {@link org.apache.flink.runtime.rpc.RpcEndpoint#scheduleRunAsync(Runnable, Time)} + */ + @Test + public void testScheduleRunAsyncTime() + throws InterruptedException, ExecutionException, TimeoutException { + RpcEndpoint dummyRpcEndpoint = new RpcEndpoint(rpcService) {}; + OneShotLatch latch = new OneShotLatch(); + Time delay = Time.seconds(1); + try { + dummyRpcEndpoint.start(); + dummyRpcEndpoint.scheduleRunAsync(latch::trigger, delay); + assertFalse(latch.isTriggered()); + if (addExtraDelay) { + TimeUnit.MILLISECONDS.sleep(extraDelayMilliSeconds); + } + latch.await(delay.getSize(), delay.getUnit()); + assertTrue(latch.isTriggered()); + } finally { + RpcUtils.terminateRpcEndpoint(dummyRpcEndpoint, TIMEOUT); + } + } + + /** + * Tests that the {@link RpcEndpoint} can execute the runnable with a delay specified in + * TimeUnit. See also {@link org.apache.flink.runtime.rpc.RpcEndpoint#scheduleRunAsync(Runnable, + * long, TimeUnit)} + */ + @Test + public void testScheduleRunAsyncTimeUnit() + throws InterruptedException, ExecutionException, TimeoutException { + RpcEndpoint dummyRpcEndpoint = new RpcEndpoint(rpcService) {}; + OneShotLatch latch = new OneShotLatch(); + final long delayMinutes = 1; + final Time delayTime = Time.minutes(delayMinutes); + + try { + dummyRpcEndpoint.start(); + + dummyRpcEndpoint.scheduleRunAsync(latch::trigger, delayMinutes, TimeUnit.MINUTES); + assertFalse(latch.isTriggered()); + if (addExtraDelay) { + TimeUnit.MILLISECONDS.sleep(extraDelayMilliSeconds); + } + latch.await(delayTime.getSize(), delayTime.getUnit()); + assertTrue(latch.isTriggered()); + latch.reset(); Review comment: I'd suggest to measure the time from the command being scheduled to it being executed. Then we can verify that time is within the range of timeout plus/minus a tolerant error (e.g., 0.2 * timeout). ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java ########## @@ -262,4 +267,136 @@ protected ExtendedEndpoint( return CompletableFuture.completedFuture(isRunning()); } } + + /** + * Tests that the {@link RpcEndpoint} can execute the runnable. See also {@link + * org.apache.flink.runtime.rpc.RpcEndpoint#runAsync(Runnable)} + */ + @Test + public void testRunAsync() throws InterruptedException, ExecutionException, TimeoutException { + RpcEndpoint dummyRpcEndpoint = new RpcEndpoint(rpcService) {}; + final OneShotLatch latch = new OneShotLatch(); + try { + dummyRpcEndpoint.start(); + dummyRpcEndpoint.runAsync(latch::trigger); + latch.await(TIMEOUT.getSize(), TIMEOUT.getUnit()); + assertTrue(latch.isTriggered()); + } finally { + RpcUtils.terminateRpcEndpoint(dummyRpcEndpoint, TIMEOUT); + } + } + + /** + * Tests that the {@link RpcEndpoint} can execute the runnable with a delay specified in Time. + * See also {@link org.apache.flink.runtime.rpc.RpcEndpoint#scheduleRunAsync(Runnable, Time)} + */ + @Test + public void testScheduleRunAsyncTime() + throws InterruptedException, ExecutionException, TimeoutException { + RpcEndpoint dummyRpcEndpoint = new RpcEndpoint(rpcService) {}; + OneShotLatch latch = new OneShotLatch(); + Time delay = Time.seconds(1); + try { + dummyRpcEndpoint.start(); + dummyRpcEndpoint.scheduleRunAsync(latch::trigger, delay); Review comment: Moreover, the test does not cover the problematic code path. - The problematic code path is calling `schedule` on `RpcEndpoint#MainThreadExecutor`, which indirectly calls `RpcEndpoint#scheduleRunAsync`. - The test calls `RpcEndpoint#scheduleRunAsync` directly. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java ########## @@ -262,4 +267,136 @@ protected ExtendedEndpoint( return CompletableFuture.completedFuture(isRunning()); } } + + /** + * Tests that the {@link RpcEndpoint} can execute the runnable. See also {@link + * org.apache.flink.runtime.rpc.RpcEndpoint#runAsync(Runnable)} + */ + @Test + public void testRunAsync() throws InterruptedException, ExecutionException, TimeoutException { + RpcEndpoint dummyRpcEndpoint = new RpcEndpoint(rpcService) {}; + final OneShotLatch latch = new OneShotLatch(); + try { + dummyRpcEndpoint.start(); + dummyRpcEndpoint.runAsync(latch::trigger); + latch.await(TIMEOUT.getSize(), TIMEOUT.getUnit()); + assertTrue(latch.isTriggered()); + } finally { + RpcUtils.terminateRpcEndpoint(dummyRpcEndpoint, TIMEOUT); + } + } + + /** + * Tests that the {@link RpcEndpoint} can execute the runnable with a delay specified in Time. + * See also {@link org.apache.flink.runtime.rpc.RpcEndpoint#scheduleRunAsync(Runnable, Time)} + */ + @Test + public void testScheduleRunAsyncTime() + throws InterruptedException, ExecutionException, TimeoutException { + RpcEndpoint dummyRpcEndpoint = new RpcEndpoint(rpcService) {}; + OneShotLatch latch = new OneShotLatch(); + Time delay = Time.seconds(1); + try { + dummyRpcEndpoint.start(); + dummyRpcEndpoint.scheduleRunAsync(latch::trigger, delay); + assertFalse(latch.isTriggered()); + if (addExtraDelay) { + TimeUnit.MILLISECONDS.sleep(extraDelayMilliSeconds); + } + latch.await(delay.getSize(), delay.getUnit()); + assertTrue(latch.isTriggered()); + } finally { + RpcUtils.terminateRpcEndpoint(dummyRpcEndpoint, TIMEOUT); + } + } + + /** + * Tests that the {@link RpcEndpoint} can execute the runnable with a delay specified in + * TimeUnit. See also {@link org.apache.flink.runtime.rpc.RpcEndpoint#scheduleRunAsync(Runnable, + * long, TimeUnit)} + */ + @Test + public void testScheduleRunAsyncTimeUnit() Review comment: This test case takes more than 3 minutes. (And `testCallAsyncTimeOut` takes 15 seconds.) There are tens of thousand of test cases. A single test takes this long will extremely increase the time out CI tests take. I'd suggest to only verify for `MILLISECONDS` and `SECONDS`. If the method works for two different time unit, theres little chance it doesn't work for a third unit. We can schedule two commands, with 500ms and 1s delay respectively. No need to wait for one command being executed before scheduling another. Then the test should only take roughly 1s. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org