zheguang commented on code in PR #19121: URL: https://github.com/apache/kafka/pull/19121#discussion_r1982385169
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java: ########## @@ -254,34 +284,131 @@ public StreamsGroupHeartbeatRequestManager(final LogContext logContext, retryBackoffMaxMs, maxPollIntervalMs ); + this.pollTimer = time.timer(maxPollIntervalMs); } + /** + * This will build a heartbeat request if one must be sent, determined based on the member + * state. A heartbeat is sent in the following situations: + * <ol> + * <li>Member is part of the consumer group or wants to join it.</li> + * <li>The heartbeat interval has expired, or the member is in a state that indicates + * that it should heartbeat without waiting for the interval.</li> + * </ol> + * This will also determine the maximum wait time until the next poll based on the member's + * state. + * <ol> + * <li>If the member is without a coordinator or is in a failed state, the timer is set + * to Long.MAX_VALUE, as there's no need to send a heartbeat.</li> + * <li>If the member cannot send a heartbeat due to either exponential backoff, it will + * return the remaining time left on the backoff timer.</li> + * <li>If the member's heartbeat timer has not expired, It will return the remaining time + * left on the heartbeat timer.</li> + * <li>If the member can send a heartbeat, the timer is set to the current heartbeat interval.</li> + * </ol> + * + * @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} that includes a + * heartbeat request if one must be sent, and the time to wait until the next poll. + */ @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { - return new NetworkClientDelegate.PollResult( - heartbeatRequestState.heartbeatIntervalMs(), - Collections.singletonList(makeHeartbeatRequest(currentTimeMs)) - ); + if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager.shouldSkipHeartbeat()) { + membershipManager.onHeartbeatRequestSkipped(); + maybePropagateCoordinatorFatalErrorEvent(); + return NetworkClientDelegate.PollResult.EMPTY; + } + pollTimer.update(currentTimeMs); + if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { + logger.warn("Consumer poll timeout has expired. This means the time between " + + "subsequent calls to poll() was longer than the configured max.poll.interval.ms, " + + "which typically implies that the poll loop is spending too much time processing " + + "messages. You can address this either by increasing max.poll.interval.ms or by " + + "reducing the maximum size of batches returned in poll() with max.poll.records."); + + membershipManager.onPollTimerExpired(); + NetworkClientDelegate.UnsentRequest leaveHeartbeat = makeHeartbeatRequestOnlyLogResponse(currentTimeMs); + + // We can ignore the leave response because we can join before or after receiving the response. + heartbeatRequestState.reset(); + heartbeatState.reset(); + return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), Collections.singletonList(leaveHeartbeat)); + } + if (shouldHeartbeatBeforeIntervalExpires() || heartbeatRequestState.canSendRequest(currentTimeMs)) { + NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs); + return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), Collections.singletonList(request)); + } else { + return new NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs)); + } + } + + /** + * A heartbeat should be sent without waiting for the heartbeat interval to expire if: + * - the member is leaving the group + * or + * - the member is joining the group or acknowledging the assignment and for both cases there is no heartbeat request + * in flight. + * @return + */ + private boolean shouldHeartbeatBeforeIntervalExpires() { + return membershipManager.state() == MemberState.LEAVING + || + (membershipManager.state() == MemberState.JOINING || membershipManager.state() == MemberState.ACKNOWLEDGING) + && !heartbeatRequestState.requestInFlight(); Review Comment: 🤔 wondering if the closing parenthesis is off by one clause? By reading the comment above, it seems this intended instead: ```java || (membershipManager.state() == MemberState.JOINING || membershipManager.state() == MemberState.ACKNOWLEDGING && !heartbeatRequestState.requestInFlight());``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org