zheguang commented on code in PR #19121:
URL: https://github.com/apache/kafka/pull/19121#discussion_r1982385169


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -254,34 +284,131 @@ public StreamsGroupHeartbeatRequestManager(final 
LogContext logContext,
             retryBackoffMaxMs,
             maxPollIntervalMs
         );
+        this.pollTimer = time.timer(maxPollIntervalMs);
     }
 
+    /**
+     * This will build a heartbeat request if one must be sent, determined 
based on the member
+     * state. A heartbeat is sent in the following situations:
+     * <ol>
+     *     <li>Member is part of the consumer group or wants to join it.</li>
+     *     <li>The heartbeat interval has expired, or the member is in a state 
that indicates
+     *     that it should heartbeat without waiting for the interval.</li>
+     * </ol>
+     * This will also determine the maximum wait time until the next poll 
based on the member's
+     * state.
+     * <ol>
+     *     <li>If the member is without a coordinator or is in a failed state, 
the timer is set
+     *     to Long.MAX_VALUE, as there's no need to send a heartbeat.</li>
+     *     <li>If the member cannot send a heartbeat due to either exponential 
backoff, it will
+     *     return the remaining time left on the backoff timer.</li>
+     *     <li>If the member's heartbeat timer has not expired, It will return 
the remaining time
+     *     left on the heartbeat timer.</li>
+     *     <li>If the member can send a heartbeat, the timer is set to the 
current heartbeat interval.</li>
+     * </ol>
+     *
+     * @return {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
that includes a
+     *         heartbeat request if one must be sent, and the time to wait 
until the next poll.
+     */
     @Override
     public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-        return new NetworkClientDelegate.PollResult(
-            heartbeatRequestState.heartbeatIntervalMs(),
-            Collections.singletonList(makeHeartbeatRequest(currentTimeMs))
-        );
+        if (coordinatorRequestManager.coordinator().isEmpty() || 
membershipManager.shouldSkipHeartbeat()) {
+            membershipManager.onHeartbeatRequestSkipped();
+            maybePropagateCoordinatorFatalErrorEvent();
+            return NetworkClientDelegate.PollResult.EMPTY;
+        }
+        pollTimer.update(currentTimeMs);
+        if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+            logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+                "subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, " +
+                "which typically implies that the poll loop is spending too 
much time processing " +
+                "messages. You can address this either by increasing 
max.poll.interval.ms or by " +
+                "reducing the maximum size of batches returned in poll() with 
max.poll.records.");
+
+            membershipManager.onPollTimerExpired();
+            NetworkClientDelegate.UnsentRequest leaveHeartbeat = 
makeHeartbeatRequestOnlyLogResponse(currentTimeMs);
+
+            // We can ignore the leave response because we can join before or 
after receiving the response.
+            heartbeatRequestState.reset();
+            heartbeatState.reset();
+            return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), 
Collections.singletonList(leaveHeartbeat));
+        }
+        if (shouldHeartbeatBeforeIntervalExpires() || 
heartbeatRequestState.canSendRequest(currentTimeMs)) {
+            NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(currentTimeMs);
+            return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), 
Collections.singletonList(request));
+        } else {
+            return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
+        }
+    }
+
+    /**
+     * A heartbeat should be sent without waiting for the heartbeat interval 
to expire if:
+     * - the member is leaving the group
+     * or
+     * - the member is joining the group or acknowledging the assignment and 
for both cases there is no heartbeat request
+     *   in flight.
+     * @return
+     */
+    private boolean shouldHeartbeatBeforeIntervalExpires() {
+        return membershipManager.state() == MemberState.LEAVING
+            ||
+            (membershipManager.state() == MemberState.JOINING || 
membershipManager.state() == MemberState.ACKNOWLEDGING)
+                && !heartbeatRequestState.requestInFlight();

Review Comment:
   🤔 wondering if the closing parenthesis is off by one clause?  By reading the 
comment above, it seems this intended instead:
   ```java
   ||
               (membershipManager.state() == MemberState.JOINING || 
membershipManager.state() == MemberState.ACKNOWLEDGING
                   && !heartbeatRequestState.requestInFlight());```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to