lucasbru commented on code in PR #19233:
URL: https://github.com/apache/kafka/pull/19233#discussion_r2009935567


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -366,6 +375,53 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
         }
     }
 
+    /**
+     * Generate a heartbeat request to leave the group if the state is still 
LEAVING when this is
+     * called to close the consumer.
+     * <p/>
+     * Note that when closing the consumer, even though an event to 
Unsubscribe is generated
+     * (triggers callbacks and sends leave group), it could be the case that 
the Unsubscribe event
+     * processing does not complete in time and moves on to close the managers 
(ex. calls to
+     * close with zero timeout). So we could end up on this pollOnClose with 
the member in
+     * {@link MemberState#PREPARE_LEAVING} (ex. app thread did not have the 
time to process the
+     * event to execute callbacks), or {@link MemberState#LEAVING} (ex. the 
leave request could
+     * not be sent due to coordinator not available at that time). In all 
cases, the pollOnClose
+     * will be triggered right before sending the final requests, so we ensure 
that we generate
+     * the request to leave if needed.
+     *
+     * @param currentTimeMs The current system time in milliseconds at which 
the method was called
+     * @return PollResult containing the request to send
+     */
+    @Override
+    public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) {
+        if (membershipManager.isLeavingGroup()) {
+            NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequestAndLogResponse(currentTimeMs);
+            return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), 
List.of(request));
+        }
+        return EMPTY;
+    }
+
+    /**
+     * Returns the delay for which the application thread can safely wait 
before it should be responsive
+     * to results from the request managers. For example, the subscription 
state can change when heartbeats
+     * are sent, so blocking for longer than the heartbeat interval might mean 
the application thread is not
+     * responsive to changes.
+     *
+     * <p>Similarly, we may have to unblock the application thread to send a 
`PollApplicationEvent` to make sure
+     * our poll timer will not expire while we are polling.
+     *
+     * <p>In the event that heartbeats are currently being skipped, this still 
returns the next heartbeat
+     * delay rather than {@code Long.MAX_VALUE} so that the application thread 
remains responsive.
+     */
+    @Override
+    public long maximumTimeToWait(long currentTimeMs) {
+        pollTimer.update(currentTimeMs);
+        if (pollTimer.isExpired() || 
(membershipManager.shouldNotWaitForHeartbeatInterval() && 
!heartbeatRequestState.requestInFlight())) {

Review Comment:
   unnecessary parantheses



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to