lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395132794


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -137,39 +135,104 @@ public HeartbeatRequestManager(
     }
 
     /**
-     * Determines the maximum wait time until the next poll based on the 
member's state, and creates a heartbeat
-     * request.
+     * This will ensure that the member starts sending heartbeats to join the 
group with the
+     * updated subscription, if it is not already part of it. If the member is 
already part of
+     * the group, this will only ensure that the updated subscription is sent 
on the next
+     * heartbeat request. No action will be taken if the member is in a {@link 
MemberState#FATAL}
+     * state.
+     * <p/>
+     * Note that list of topics of the subscription is taken from the shared 
subscription state.
+     */
+    public void onSubscriptionUpdated() {
+        if (membershipManager.state() == MemberState.FATAL) {
+            logger.debug("No action taken join the group or update the 
subscription because " +
+                    "the member is in FATAL state");
+            return;
+        }
+
+        if (membershipManager.state() == MemberState.UNSUBSCRIBED) {
+            membershipManager.transitionToJoining();
+        }
+    }
+
+    /**
+     * Release assignment and send heartbeat request to leave the group. If 
the member is not
+     * part of the group or is in a FATAL state this won't take any action and 
will return a
+     * completed future.
+     *
+     * @return Future that will complete when the callback execution completes 
and the heartbeat
+     * request to leave is sent out. The future will fail it the callback 
execution fails.
+     */
+    public CompletableFuture<Void> onUnsubscribe() {
+        boolean notInGroup =
+                membershipManager.state() == MemberState.UNSUBSCRIBED ||
+                        membershipManager.state() == MemberState.FATAL;
+        if (notInGroup) {
+            return CompletableFuture.completedFuture(null);
+        }
+        // TODO: Consider no-op if member is already LEAVING too (repeated 
calls to unsubscribe
+        //  potentially storming the broker?). To double check the current 
behaviour, as it does
+        //  not seem to handle it that way.

Review Comment:
   Done (now in the membership manager `leaveGroup`). No-op if already leaving, 
and returning the future that will complete when the ongoing leave completes. 
Also handling the case where the member already left (no-op and return right 
away)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to