lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1649427661
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -659,78 +753,38 @@ public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin @Test public void testisExpiredByUsedForLogging() { - Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS)); - heartbeatRequestManager = new HeartbeatRequestManager(new LogContext(), pollTimer, config(), - coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, - backgroundEventHandler, metrics); when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); int exceededTimeMs = 5; time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs); + when(membershipManager.isLeavingGroup()).thenReturn(false); + when(pollTimer.isExpired()).thenReturn(true); NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); verify(membershipManager).transitionToSendingLeaveGroup(true); verify(pollTimer, never()).isExpiredBy(); - assertEquals(exceededTimeMs, pollTimer.isExpiredBy()); clearInvocations(pollTimer); heartbeatRequestManager.resetPollTimer(time.milliseconds()); verify(pollTimer).isExpiredBy(); } @Test - public void testHeartbeatMetrics() { - // setup - coordinatorRequestManager = mock(CoordinatorRequestManager.class); - membershipManager = mock(MembershipManager.class); - heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); - time = new MockTime(); - metrics = new Metrics(time); - heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( - new LogContext(), - time, - 0, // This initial interval should be 0 to ensure heartbeat on the clock - DEFAULT_RETRY_BACKOFF_MS, - DEFAULT_RETRY_BACKOFF_MAX_MS, - 0); - backgroundEventHandler = mock(BackgroundEventHandler.class); + public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { heartbeatRequestManager = createHeartbeatRequestManager( - coordinatorRequestManager, - membershipManager, - heartbeatState, - heartbeatRequestState, - backgroundEventHandler); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); - when(membershipManager.state()).thenReturn(MemberState.STABLE); - - assertNotNull(getMetric("heartbeat-response-time-max")); - assertNotNull(getMetric("heartbeat-rate")); - assertNotNull(getMetric("heartbeat-total")); - assertNotNull(getMetric("last-heartbeat-seconds-ago")); - - // test poll - assertHeartbeat(heartbeatRequestManager, 0); - time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - assertEquals(1.0, getMetric("heartbeat-total").metricValue()); - assertEquals((double) TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), getMetric("last-heartbeat-seconds-ago").metricValue()); - - assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); - assertEquals(0.06d, (double) getMetric("heartbeat-rate").metricValue(), 0.005d); - assertEquals(2.0, getMetric("heartbeat-total").metricValue()); - - // Randomly sleep for some time - Random rand = new Random(); - int randomSleepS = rand.nextInt(11); - time.sleep(randomSleepS * 1000); - assertEquals((double) randomSleepS, getMetric("last-heartbeat-seconds-ago").metricValue()); - } + coordinatorRequestManager, + membershipManager, + heartbeatState, + heartbeatRequestState, + backgroundEventHandler); - @Test - public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { + when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); + when(membershipManager.state()).thenReturn(MemberState.STABLE); mockStableMember(); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + when(membershipManager.isLeavingGroup()).thenReturn(true); Review Comment: uhm do we need this here? I wouldn't expect so (the membershipMgr is a mock now, and the HB mgr does not check the isLeavingGroup to generate a HB) ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -659,78 +753,38 @@ public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin @Test public void testisExpiredByUsedForLogging() { - Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS)); - heartbeatRequestManager = new HeartbeatRequestManager(new LogContext(), pollTimer, config(), - coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, - backgroundEventHandler, metrics); when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); int exceededTimeMs = 5; time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs); + when(membershipManager.isLeavingGroup()).thenReturn(false); + when(pollTimer.isExpired()).thenReturn(true); NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); verify(membershipManager).transitionToSendingLeaveGroup(true); verify(pollTimer, never()).isExpiredBy(); - assertEquals(exceededTimeMs, pollTimer.isExpiredBy()); clearInvocations(pollTimer); heartbeatRequestManager.resetPollTimer(time.milliseconds()); verify(pollTimer).isExpiredBy(); } @Test - public void testHeartbeatMetrics() { - // setup - coordinatorRequestManager = mock(CoordinatorRequestManager.class); - membershipManager = mock(MembershipManager.class); - heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); - time = new MockTime(); - metrics = new Metrics(time); - heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( - new LogContext(), - time, - 0, // This initial interval should be 0 to ensure heartbeat on the clock - DEFAULT_RETRY_BACKOFF_MS, - DEFAULT_RETRY_BACKOFF_MAX_MS, - 0); - backgroundEventHandler = mock(BackgroundEventHandler.class); + public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { heartbeatRequestManager = createHeartbeatRequestManager( - coordinatorRequestManager, - membershipManager, - heartbeatState, - heartbeatRequestState, - backgroundEventHandler); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); - when(membershipManager.state()).thenReturn(MemberState.STABLE); - - assertNotNull(getMetric("heartbeat-response-time-max")); - assertNotNull(getMetric("heartbeat-rate")); - assertNotNull(getMetric("heartbeat-total")); - assertNotNull(getMetric("last-heartbeat-seconds-ago")); - - // test poll - assertHeartbeat(heartbeatRequestManager, 0); - time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - assertEquals(1.0, getMetric("heartbeat-total").metricValue()); - assertEquals((double) TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), getMetric("last-heartbeat-seconds-ago").metricValue()); - - assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); - assertEquals(0.06d, (double) getMetric("heartbeat-rate").metricValue(), 0.005d); - assertEquals(2.0, getMetric("heartbeat-total").metricValue()); - - // Randomly sleep for some time - Random rand = new Random(); - int randomSleepS = rand.nextInt(11); - time.sleep(randomSleepS * 1000); - assertEquals((double) randomSleepS, getMetric("last-heartbeat-seconds-ago").metricValue()); - } + coordinatorRequestManager, + membershipManager, + heartbeatState, + heartbeatRequestState, + backgroundEventHandler); - @Test - public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { + when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); + when(membershipManager.state()).thenReturn(MemberState.STABLE); mockStableMember(); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + when(membershipManager.isLeavingGroup()).thenReturn(true); Review Comment: also on ln 793 there's a doNothing().when(membershipManager) that I would say we don't need on the mock ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -744,34 +798,17 @@ public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { verify(heartbeatRequestState).onFailedAttempt(anyLong()); verify(heartbeatRequestState).reset(); + when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(false); when(membershipManager.state()).thenReturn(MemberState.FENCED); Review Comment: what the HB manager checks on the membershipMgr in this case is the shouldSKip, so I guess what we need to do here is to mock the fencing is: `when(membershipManager.shouldSkipHeartbeat()).thenReturn(true);` (wonder if this will allow you to loose the when canSendRequest too, which I would expect should work because the heartbeatRequestState is not a mock) ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -744,34 +798,17 @@ public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { verify(heartbeatRequestState).onFailedAttempt(anyLong()); verify(heartbeatRequestState).reset(); + when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(false); when(membershipManager.state()).thenReturn(MemberState.FENCED); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "Member should not send heartbeats while FENCED"); + when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); when(membershipManager.state()).thenReturn(MemberState.JOINING); Review Comment: ditto, we probably don't need to add an expectation on the canSend, and we just need to set the right one on the membershipMgr to show it's not fenced anymore (when(membershipManager.shouldSkipHeartbeat()).thenReturn(false)) ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -659,78 +753,38 @@ public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin @Test public void testisExpiredByUsedForLogging() { - Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS)); - heartbeatRequestManager = new HeartbeatRequestManager(new LogContext(), pollTimer, config(), - coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, - backgroundEventHandler, metrics); when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); int exceededTimeMs = 5; time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs); + when(membershipManager.isLeavingGroup()).thenReturn(false); + when(pollTimer.isExpired()).thenReturn(true); NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); verify(membershipManager).transitionToSendingLeaveGroup(true); verify(pollTimer, never()).isExpiredBy(); - assertEquals(exceededTimeMs, pollTimer.isExpiredBy()); clearInvocations(pollTimer); heartbeatRequestManager.resetPollTimer(time.milliseconds()); verify(pollTimer).isExpiredBy(); } @Test - public void testHeartbeatMetrics() { - // setup - coordinatorRequestManager = mock(CoordinatorRequestManager.class); - membershipManager = mock(MembershipManager.class); - heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); - time = new MockTime(); - metrics = new Metrics(time); - heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( - new LogContext(), - time, - 0, // This initial interval should be 0 to ensure heartbeat on the clock - DEFAULT_RETRY_BACKOFF_MS, - DEFAULT_RETRY_BACKOFF_MAX_MS, - 0); - backgroundEventHandler = mock(BackgroundEventHandler.class); + public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { heartbeatRequestManager = createHeartbeatRequestManager( - coordinatorRequestManager, - membershipManager, - heartbeatState, - heartbeatRequestState, - backgroundEventHandler); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); - when(membershipManager.state()).thenReturn(MemberState.STABLE); - - assertNotNull(getMetric("heartbeat-response-time-max")); - assertNotNull(getMetric("heartbeat-rate")); - assertNotNull(getMetric("heartbeat-total")); - assertNotNull(getMetric("last-heartbeat-seconds-ago")); - - // test poll - assertHeartbeat(heartbeatRequestManager, 0); - time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - assertEquals(1.0, getMetric("heartbeat-total").metricValue()); - assertEquals((double) TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), getMetric("last-heartbeat-seconds-ago").metricValue()); - - assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); - assertEquals(0.06d, (double) getMetric("heartbeat-rate").metricValue(), 0.005d); - assertEquals(2.0, getMetric("heartbeat-total").metricValue()); - - // Randomly sleep for some time - Random rand = new Random(); - int randomSleepS = rand.nextInt(11); - time.sleep(randomSleepS * 1000); - assertEquals((double) randomSleepS, getMetric("last-heartbeat-seconds-ago").metricValue()); - } + coordinatorRequestManager, + membershipManager, + heartbeatState, + heartbeatRequestState, + backgroundEventHandler); - @Test - public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { + when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); + when(membershipManager.state()).thenReturn(MemberState.STABLE); Review Comment: setting this expectation is great, and it actually makes me notice that the `mockStableMember()` could probably be removed? it was just taking all the actions needed to put the membershipMgr into STABLE (when it was an instance, not a mock) ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -744,34 +798,17 @@ public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { verify(heartbeatRequestState).onFailedAttempt(anyLong()); verify(heartbeatRequestState).reset(); + when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(false); when(membershipManager.state()).thenReturn(MemberState.FENCED); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "Member should not send heartbeats while FENCED"); + when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); when(membershipManager.state()).thenReturn(MemberState.JOINING); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size(), "Fenced member should resume heartbeat after transitioning to JOINING"); } - @ParameterizedTest - @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) - public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short version) { Review Comment: uhm we shouldn't remove this test (it was actually an issue we discovered, fixed, and added the test for coverage). I see it involves checking the HB content (that is might be off just because the heartbeatState is a mock, but give it another try after addressing the comments above to turn that state into an actual instance able to build the HB content) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org