lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1649427661


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -659,78 +753,38 @@ public void 
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
 
     @Test
     public void testisExpiredByUsedForLogging() {
-        Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS));
-        heartbeatRequestManager = new HeartbeatRequestManager(new 
LogContext(), pollTimer, config(),
-            coordinatorRequestManager, membershipManager, heartbeatState, 
heartbeatRequestState,
-            backgroundEventHandler, metrics);
         when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
 
         int exceededTimeMs = 5;
         time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
 
+        when(membershipManager.isLeavingGroup()).thenReturn(false);
+        when(pollTimer.isExpired()).thenReturn(true);
         NetworkClientDelegate.PollResult pollResult = 
heartbeatRequestManager.poll(time.milliseconds());
         assertEquals(1, pollResult.unsentRequests.size());
         verify(membershipManager).transitionToSendingLeaveGroup(true);
         verify(pollTimer, never()).isExpiredBy();
-        assertEquals(exceededTimeMs, pollTimer.isExpiredBy());
 
         clearInvocations(pollTimer);
         heartbeatRequestManager.resetPollTimer(time.milliseconds());
         verify(pollTimer).isExpiredBy();
     }
 
     @Test
-    public void testHeartbeatMetrics() {
-        // setup
-        coordinatorRequestManager = mock(CoordinatorRequestManager.class);
-        membershipManager = mock(MembershipManager.class);
-        heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
-        time = new MockTime();
-        metrics = new Metrics(time);
-        heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
-            new LogContext(),
-            time,
-            0, // This initial interval should be 0 to ensure heartbeat on the 
clock
-            DEFAULT_RETRY_BACKOFF_MS,
-            DEFAULT_RETRY_BACKOFF_MAX_MS,
-            0);
-        backgroundEventHandler = mock(BackgroundEventHandler.class);
+    public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
         heartbeatRequestManager = createHeartbeatRequestManager(
-            coordinatorRequestManager,
-            membershipManager,
-            heartbeatState,
-            heartbeatRequestState,
-            backgroundEventHandler);
-        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", 9999)));
-        when(membershipManager.state()).thenReturn(MemberState.STABLE);
-
-        assertNotNull(getMetric("heartbeat-response-time-max"));
-        assertNotNull(getMetric("heartbeat-rate"));
-        assertNotNull(getMetric("heartbeat-total"));
-        assertNotNull(getMetric("last-heartbeat-seconds-ago"));
-
-        // test poll
-        assertHeartbeat(heartbeatRequestManager, 0);
-        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
-        assertEquals(1.0, getMetric("heartbeat-total").metricValue());
-        assertEquals((double) 
TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), 
getMetric("last-heartbeat-seconds-ago").metricValue());
-
-        assertHeartbeat(heartbeatRequestManager, 
DEFAULT_HEARTBEAT_INTERVAL_MS);
-        assertEquals(0.06d, (double) 
getMetric("heartbeat-rate").metricValue(), 0.005d);
-        assertEquals(2.0, getMetric("heartbeat-total").metricValue());
-
-        // Randomly sleep for some time
-        Random rand = new Random();
-        int randomSleepS = rand.nextInt(11);
-        time.sleep(randomSleepS * 1000);
-        assertEquals((double) randomSleepS, 
getMetric("last-heartbeat-seconds-ago").metricValue());
-    }
+                coordinatorRequestManager,
+                membershipManager,
+                heartbeatState,
+                heartbeatRequestState,
+                backgroundEventHandler);
 
-    @Test
-    public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
+        when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
+        when(membershipManager.state()).thenReturn(MemberState.STABLE);
         mockStableMember();
 
         time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        when(membershipManager.isLeavingGroup()).thenReturn(true);

Review Comment:
   uhm do we need this here? I wouldn't expect so (the membershipMgr is a mock 
now, and the HB mgr does not check the isLeavingGroup to generate a HB)



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -659,78 +753,38 @@ public void 
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
 
     @Test
     public void testisExpiredByUsedForLogging() {
-        Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS));
-        heartbeatRequestManager = new HeartbeatRequestManager(new 
LogContext(), pollTimer, config(),
-            coordinatorRequestManager, membershipManager, heartbeatState, 
heartbeatRequestState,
-            backgroundEventHandler, metrics);
         when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
 
         int exceededTimeMs = 5;
         time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
 
+        when(membershipManager.isLeavingGroup()).thenReturn(false);
+        when(pollTimer.isExpired()).thenReturn(true);
         NetworkClientDelegate.PollResult pollResult = 
heartbeatRequestManager.poll(time.milliseconds());
         assertEquals(1, pollResult.unsentRequests.size());
         verify(membershipManager).transitionToSendingLeaveGroup(true);
         verify(pollTimer, never()).isExpiredBy();
-        assertEquals(exceededTimeMs, pollTimer.isExpiredBy());
 
         clearInvocations(pollTimer);
         heartbeatRequestManager.resetPollTimer(time.milliseconds());
         verify(pollTimer).isExpiredBy();
     }
 
     @Test
-    public void testHeartbeatMetrics() {
-        // setup
-        coordinatorRequestManager = mock(CoordinatorRequestManager.class);
-        membershipManager = mock(MembershipManager.class);
-        heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
-        time = new MockTime();
-        metrics = new Metrics(time);
-        heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
-            new LogContext(),
-            time,
-            0, // This initial interval should be 0 to ensure heartbeat on the 
clock
-            DEFAULT_RETRY_BACKOFF_MS,
-            DEFAULT_RETRY_BACKOFF_MAX_MS,
-            0);
-        backgroundEventHandler = mock(BackgroundEventHandler.class);
+    public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
         heartbeatRequestManager = createHeartbeatRequestManager(
-            coordinatorRequestManager,
-            membershipManager,
-            heartbeatState,
-            heartbeatRequestState,
-            backgroundEventHandler);
-        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", 9999)));
-        when(membershipManager.state()).thenReturn(MemberState.STABLE);
-
-        assertNotNull(getMetric("heartbeat-response-time-max"));
-        assertNotNull(getMetric("heartbeat-rate"));
-        assertNotNull(getMetric("heartbeat-total"));
-        assertNotNull(getMetric("last-heartbeat-seconds-ago"));
-
-        // test poll
-        assertHeartbeat(heartbeatRequestManager, 0);
-        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
-        assertEquals(1.0, getMetric("heartbeat-total").metricValue());
-        assertEquals((double) 
TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), 
getMetric("last-heartbeat-seconds-ago").metricValue());
-
-        assertHeartbeat(heartbeatRequestManager, 
DEFAULT_HEARTBEAT_INTERVAL_MS);
-        assertEquals(0.06d, (double) 
getMetric("heartbeat-rate").metricValue(), 0.005d);
-        assertEquals(2.0, getMetric("heartbeat-total").metricValue());
-
-        // Randomly sleep for some time
-        Random rand = new Random();
-        int randomSleepS = rand.nextInt(11);
-        time.sleep(randomSleepS * 1000);
-        assertEquals((double) randomSleepS, 
getMetric("last-heartbeat-seconds-ago").metricValue());
-    }
+                coordinatorRequestManager,
+                membershipManager,
+                heartbeatState,
+                heartbeatRequestState,
+                backgroundEventHandler);
 
-    @Test
-    public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
+        when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
+        when(membershipManager.state()).thenReturn(MemberState.STABLE);
         mockStableMember();
 
         time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        when(membershipManager.isLeavingGroup()).thenReturn(true);

Review Comment:
   also on ln 793 there's a doNothing().when(membershipManager) that I would 
say we don't need on the mock



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -744,34 +798,17 @@ public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
         verify(heartbeatRequestState).onFailedAttempt(anyLong());
         verify(heartbeatRequestState).reset();
 
+        
when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(false);
         when(membershipManager.state()).thenReturn(MemberState.FENCED);

Review Comment:
   what the HB manager checks on the membershipMgr in this case is the 
shouldSKip, so I guess what we need to do here is to mock the fencing is: 
`when(membershipManager.shouldSkipHeartbeat()).thenReturn(true);` (wonder if 
this will allow you to loose the when canSendRequest too, which I would expect 
should work because the heartbeatRequestState is not a mock)



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -744,34 +798,17 @@ public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
         verify(heartbeatRequestState).onFailedAttempt(anyLong());
         verify(heartbeatRequestState).reset();
 
+        
when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(false);
         when(membershipManager.state()).thenReturn(MemberState.FENCED);
         result = heartbeatRequestManager.poll(time.milliseconds());
         assertEquals(0, result.unsentRequests.size(), "Member should not send 
heartbeats while FENCED");
 
+        when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
         when(membershipManager.state()).thenReturn(MemberState.JOINING);

Review Comment:
   ditto, we probably don't need to add an expectation on the canSend, and we 
just need to set the right one on the membershipMgr to show it's not fenced 
anymore (when(membershipManager.shouldSkipHeartbeat()).thenReturn(false))



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -659,78 +753,38 @@ public void 
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
 
     @Test
     public void testisExpiredByUsedForLogging() {
-        Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS));
-        heartbeatRequestManager = new HeartbeatRequestManager(new 
LogContext(), pollTimer, config(),
-            coordinatorRequestManager, membershipManager, heartbeatState, 
heartbeatRequestState,
-            backgroundEventHandler, metrics);
         when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
 
         int exceededTimeMs = 5;
         time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
 
+        when(membershipManager.isLeavingGroup()).thenReturn(false);
+        when(pollTimer.isExpired()).thenReturn(true);
         NetworkClientDelegate.PollResult pollResult = 
heartbeatRequestManager.poll(time.milliseconds());
         assertEquals(1, pollResult.unsentRequests.size());
         verify(membershipManager).transitionToSendingLeaveGroup(true);
         verify(pollTimer, never()).isExpiredBy();
-        assertEquals(exceededTimeMs, pollTimer.isExpiredBy());
 
         clearInvocations(pollTimer);
         heartbeatRequestManager.resetPollTimer(time.milliseconds());
         verify(pollTimer).isExpiredBy();
     }
 
     @Test
-    public void testHeartbeatMetrics() {
-        // setup
-        coordinatorRequestManager = mock(CoordinatorRequestManager.class);
-        membershipManager = mock(MembershipManager.class);
-        heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
-        time = new MockTime();
-        metrics = new Metrics(time);
-        heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
-            new LogContext(),
-            time,
-            0, // This initial interval should be 0 to ensure heartbeat on the 
clock
-            DEFAULT_RETRY_BACKOFF_MS,
-            DEFAULT_RETRY_BACKOFF_MAX_MS,
-            0);
-        backgroundEventHandler = mock(BackgroundEventHandler.class);
+    public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
         heartbeatRequestManager = createHeartbeatRequestManager(
-            coordinatorRequestManager,
-            membershipManager,
-            heartbeatState,
-            heartbeatRequestState,
-            backgroundEventHandler);
-        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", 9999)));
-        when(membershipManager.state()).thenReturn(MemberState.STABLE);
-
-        assertNotNull(getMetric("heartbeat-response-time-max"));
-        assertNotNull(getMetric("heartbeat-rate"));
-        assertNotNull(getMetric("heartbeat-total"));
-        assertNotNull(getMetric("last-heartbeat-seconds-ago"));
-
-        // test poll
-        assertHeartbeat(heartbeatRequestManager, 0);
-        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
-        assertEquals(1.0, getMetric("heartbeat-total").metricValue());
-        assertEquals((double) 
TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), 
getMetric("last-heartbeat-seconds-ago").metricValue());
-
-        assertHeartbeat(heartbeatRequestManager, 
DEFAULT_HEARTBEAT_INTERVAL_MS);
-        assertEquals(0.06d, (double) 
getMetric("heartbeat-rate").metricValue(), 0.005d);
-        assertEquals(2.0, getMetric("heartbeat-total").metricValue());
-
-        // Randomly sleep for some time
-        Random rand = new Random();
-        int randomSleepS = rand.nextInt(11);
-        time.sleep(randomSleepS * 1000);
-        assertEquals((double) randomSleepS, 
getMetric("last-heartbeat-seconds-ago").metricValue());
-    }
+                coordinatorRequestManager,
+                membershipManager,
+                heartbeatState,
+                heartbeatRequestState,
+                backgroundEventHandler);
 
-    @Test
-    public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
+        when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
+        when(membershipManager.state()).thenReturn(MemberState.STABLE);

Review Comment:
   setting this expectation is great, and it actually makes me notice that the 
`mockStableMember()` could probably be removed? it was just taking all the 
actions needed to put the membershipMgr into STABLE (when it was an instance, 
not a mock)



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -744,34 +798,17 @@ public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
         verify(heartbeatRequestState).onFailedAttempt(anyLong());
         verify(heartbeatRequestState).reset();
 
+        
when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(false);
         when(membershipManager.state()).thenReturn(MemberState.FENCED);
         result = heartbeatRequestManager.poll(time.milliseconds());
         assertEquals(0, result.unsentRequests.size(), "Member should not send 
heartbeats while FENCED");
 
+        when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
         when(membershipManager.state()).thenReturn(MemberState.JOINING);
         result = heartbeatRequestManager.poll(time.milliseconds());
         assertEquals(1, result.unsentRequests.size(), "Fenced member should 
resume heartbeat after transitioning to JOINING");
     }
 
-    @ParameterizedTest
-    @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
-    public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final 
short version) {

Review Comment:
   uhm we shouldn't remove this test (it was actually an issue we discovered, 
fixed, and added the test for coverage). I see it involves checking the HB 
content (that is might be off just because the heartbeatState is a mock, but 
give it another try after addressing the comments above to turn that state into 
an actual instance able to build the HB content)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to