cadonna commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1579187392


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
         assertEquals(0, result2.unsentRequests.size());
     }
 
+    @Test
+    public void testSuccessfulHeartbeatTiming() {
+        mockStableMember();
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, result.unsentRequests.size(),
+            "No heartbeat should be sent while interval has not expired");
+        long currentTimeMs = time.milliseconds();
+        
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+        assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");

Review Comment:
   Don't you need a verification here that ensures that the heartbeat timer was 
reset after the poll? 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
         assertEquals(0, result2.unsentRequests.size());
     }
 
+    @Test
+    public void testSuccessfulHeartbeatTiming() {
+        mockStableMember();
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, result.unsentRequests.size(),
+            "No heartbeat should be sent while interval has not expired");
+        long currentTimeMs = time.milliseconds();
+        
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+        assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");
+        NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+        inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+        assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);

Review Comment:
   How do you know whether the heartbeat timer was reset in 
`makeHeartbeatRequest()` or in `onResponse()` if you verify the reset after you 
complete the future of the request?
   I would verify the reset after the poll, progress the time a bit (less then 
the heartbeat interval), and then verify here that the time did not change 
after the progress.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -469,19 +469,33 @@ public void resetTimer() {
             this.heartbeatTimer.reset(heartbeatIntervalMs);
         }
 
+        /**
+         * Check if a heartbeat request should be sent on the current time. A 
heartbeat should be
+         * sent if the heartbeat timer has expired, backoff has expired, and 
there is no request
+         * in-flight.
+         */
         @Override
         public boolean canSendRequest(final long currentTimeMs) {
             update(currentTimeMs);
             return heartbeatTimer.isExpired() && 
super.canSendRequest(currentTimeMs);
         }
 
-        public long nextHeartbeatMs(final long currentTimeMs) {
+        public long timeToNextHeartbeatMs(final long currentTimeMs) {
             if (heartbeatTimer.remainingMs() == 0) {

Review Comment:
   Sorry if I comment on code outside the PR. Isn't this the same as 
`heartbeatTimer.isExpired()`? If yes, could we please change this to make the 
the code more readable?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to