kirktrue commented on code in PR #16974: URL: https://github.com/apache/kafka/pull/16974#discussion_r1733181704
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java: ########## @@ -195,6 +197,32 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { */ public abstract AbstractMembershipManager<R> membershipManager(); + /** + * Generate a heartbeat request to leave the group if the state is still LEAVING when this is + * called to close the consumer. Review Comment: Is it possible that there's already a 'leave group' request that was sent? Won't `AsyncKafkaConsumer.releaseAssignmentAndLeaveGroup()` be called before before the request managers are closed. Or does it not matter if we send more than one leave group request? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ########## @@ -370,11 +370,10 @@ public void testFetcherCloseClosesFetchSessionsInBroker() { final ArgumentCaptor<NetworkClientDelegate.UnsentRequest> argument = ArgumentCaptor.forClass(NetworkClientDelegate.UnsentRequest.class); - Timer timer = time.timer(Duration.ofSeconds(10)); Review Comment: So this timer was totally unnecessary all this time? Yeesh 😄 ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java: ########## @@ -156,7 +156,7 @@ private void createHeartbeatRequestStateWithZeroHeartbeatInterval() { backgroundEventHandler); } - private void createHeartbeatStatAndRequestManager() { + private void createHeartbeatStateAndRequestManager() { Review Comment: 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org