cadonna commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1579187392
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() { assertEquals(0, result2.unsentRequests.size()); } + @Test + public void testSuccessfulHeartbeatTiming() { + mockStableMember(); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size(), + "No heartbeat should be sent while interval has not expired"); + long currentTimeMs = time.milliseconds(); + assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), result.timeUntilNextPollMs); + assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); + + result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); Review Comment: Don't you need a verification here that ensures that the heartbeat timer was reset after the poll? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() { assertEquals(0, result2.unsentRequests.size()); } + @Test + public void testSuccessfulHeartbeatTiming() { + mockStableMember(); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size(), + "No heartbeat should be sent while interval has not expired"); + long currentTimeMs = time.milliseconds(); + assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), result.timeUntilNextPollMs); + assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); + + result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size(), "A heartbeat should be sent when interval expires"); + NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); + inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); + assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS); Review Comment: How do you know whether the heartbeat timer was reset in `makeHeartbeatRequest()` or in `onResponse()` if you verify the reset after you complete the future of the request? I would verify the reset after the poll, progress the time a bit (less then the heartbeat interval), and then verify here that the time did not change after the progress. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -469,19 +469,33 @@ public void resetTimer() { this.heartbeatTimer.reset(heartbeatIntervalMs); } + /** + * Check if a heartbeat request should be sent on the current time. A heartbeat should be + * sent if the heartbeat timer has expired, backoff has expired, and there is no request + * in-flight. + */ @Override public boolean canSendRequest(final long currentTimeMs) { update(currentTimeMs); return heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs); } - public long nextHeartbeatMs(final long currentTimeMs) { + public long timeToNextHeartbeatMs(final long currentTimeMs) { if (heartbeatTimer.remainingMs() == 0) { Review Comment: Sorry if I comment on code outside the PR. Isn't this the same as `heartbeatTimer.isExpired()`? If yes, could we please change this to make the the code more readable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org