lucasbru commented on code in PR #19233: URL: https://github.com/apache/kafka/pull/19233#discussion_r2009935567
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java: ########## @@ -366,6 +375,53 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { } } + /** + * Generate a heartbeat request to leave the group if the state is still LEAVING when this is + * called to close the consumer. + * <p/> + * Note that when closing the consumer, even though an event to Unsubscribe is generated + * (triggers callbacks and sends leave group), it could be the case that the Unsubscribe event + * processing does not complete in time and moves on to close the managers (ex. calls to + * close with zero timeout). So we could end up on this pollOnClose with the member in + * {@link MemberState#PREPARE_LEAVING} (ex. app thread did not have the time to process the + * event to execute callbacks), or {@link MemberState#LEAVING} (ex. the leave request could + * not be sent due to coordinator not available at that time). In all cases, the pollOnClose + * will be triggered right before sending the final requests, so we ensure that we generate + * the request to leave if needed. + * + * @param currentTimeMs The current system time in milliseconds at which the method was called + * @return PollResult containing the request to send + */ + @Override + public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) { + if (membershipManager.isLeavingGroup()) { + NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequestAndLogResponse(currentTimeMs); + return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), List.of(request)); + } + return EMPTY; + } + + /** + * Returns the delay for which the application thread can safely wait before it should be responsive + * to results from the request managers. For example, the subscription state can change when heartbeats + * are sent, so blocking for longer than the heartbeat interval might mean the application thread is not + * responsive to changes. + * + * <p>Similarly, we may have to unblock the application thread to send a `PollApplicationEvent` to make sure + * our poll timer will not expire while we are polling. + * + * <p>In the event that heartbeats are currently being skipped, this still returns the next heartbeat + * delay rather than {@code Long.MAX_VALUE} so that the application thread remains responsive. + */ + @Override + public long maximumTimeToWait(long currentTimeMs) { + pollTimer.update(currentTimeMs); + if (pollTimer.isExpired() || (membershipManager.shouldNotWaitForHeartbeatInterval() && !heartbeatRequestState.requestInFlight())) { Review Comment: unnecessary parantheses -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org