reswqa commented on code in PR #22457: URL: https://github.com/apache/flink/pull/22457#discussion_r1187645751
########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java: ########## @@ -93,28 +92,66 @@ public void testReadCallbacksAndBufferRecycling() throws Exception { // Verify callback channel.writeInbound(buf); - verify(callback, times(1)).onFailure(isA(RuntimeException.class)); + assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1); + assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class); // // Unexpected messages // buf = channel.alloc().buffer(4).writeInt(1223823); // Verify callback + TestClientHandlerCallback.onFailureCnt = 0; channel.writeInbound(buf); - verify(callback, times(1)).onFailure(isA(IllegalStateException.class)); - assertEquals("Buffer not recycled", 0, buf.refCnt()); + assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1); + assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class); + assertThat(buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not recycled"); // // Exception caught // + TestClientHandlerCallback.onFailureCnt = 0; channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception")); - verify(callback, times(3)).onFailure(isA(RuntimeException.class)); + assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1); + assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(RuntimeException.class); // // Channel inactive // + TestClientHandlerCallback.onFailureCnt = 0; channel.pipeline().fireChannelInactive(); - verify(callback, times(1)).onFailure(isA(ClosedChannelException.class)); + assertThat(TestClientHandlerCallback.onFailureCnt).isEqualTo(1); + assertThat(TestClientHandlerCallback.onFailureBody).isInstanceOf(ClosedChannelException.class); + } + + private static class TestClientHandlerCallback implements ClientHandlerCallback { + private static int onRequestCnt; Review Comment: I tend not to use static. The reason is that we should write reusable test components as much as possible, If we have multiple test cases in the future that require `TestingClientHandlerCallback`, then static will become a hassle. Especially we cannot assume that they are always executed sequentially in a single thread. ########## flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java: ########## @@ -48,31 +48,33 @@ public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTe private static final int QS_PROXY_PORT_RANGE_START = 9084; private static final int QS_SERVER_PORT_RANGE_START = 9089; - @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = Review Comment: Maybe we can use `Order` annotation provided by `Junit5`, for example: ``` class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestBase { private static final int NUM_JMS = 2; // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines so that // we always use all TaskManagers so that the JM oracle is always properly re-registered private static final int NUM_TMS = 2; private static final int NUM_SLOTS_PER_TM = 2; private static final int QS_PROXY_PORT_RANGE_START = 9064; private static final int QS_SERVER_PORT_RANGE_START = 9069; @TempDir @Order(1) public static Path tmpStateBackendDir; @TempDir @Order(2) public static Path tmpHaStoragePath; @RegisterExtension @Order(3) public static final AllCallbackWrapper<ZooKeeperExtension> ZK_RESOURCE = new AllCallbackWrapper<>(new ZooKeeperExtension()); @RegisterExtension @Order(4) public static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension( () -> new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfig()) .setNumberTaskManagers(NUM_TMS) .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM) .build()); @Override protected StateBackend createStateBackend() throws Exception { return new FsStateBackend(tmpStateBackendDir.toUri().toString()); } @BeforeAll static void setup(@InjectClusterClient RestClusterClient<?> injectedClusterClient) throws Exception { client = new QueryableStateClient("localhost", QS_PROXY_PORT_RANGE_START); clusterClient = injectedClusterClient; } @AfterAll static void tearDown() throws Exception { client.shutdownAndWait(); } private static Configuration getConfig() { Configuration config = new Configuration(); config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true); config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m")); config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2); config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2); config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2); config.setString( QueryableStateOptions.PROXY_PORT_RANGE, QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_TMS)); config.setString( QueryableStateOptions.SERVER_PORT_RANGE, QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_TMS)); config.setBoolean(WebOptions.SUBMIT_ENABLE, false); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, tmpHaStoragePath.toString()); config.setString( HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZK_RESOURCE.getCustomExtension().getConnectString()); config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); return config; } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org