lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1396013115


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member failed with fatal error.", error);
+            }
+        });
+        subscriptions.assignFromSubscribed(Collections.emptySet());
+        clearPendingAssignmentsAndLocalNamesCache();
+        transitionTo(MemberState.FATAL);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void onSubscriptionUpdated() {
+        if (state == MemberState.UNSUBSCRIBED) {
+            transitionToJoining();
+        }
+        // TODO: If the member is already part of the group, this should only 
ensure that the
+        //  updated subscription is included in the next heartbeat request.
     }
 
+    /**
+     * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
+     * try to join the group on the next heartbeat request. This is expected 
to be invoked when
+     * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenced.
+     * Visible for testing.
+     */
+    void transitionToJoining() {
+        if (state == MemberState.FATAL) {
+            log.warn("No action taken to join the group with the updated 
subscription because " +
+                    "the member is in FATAL state");
+            return;
+        }
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+        clearPendingAssignmentsAndLocalNamesCache();
+        registerForMetadataUpdates();
+    }
+
+    /**
+     * Register to get notified when the cluster metadata is updated, via the
+     * {@link #onUpdate(ClusterResource)}. Register only if the manager is not 
register already.
+     */
+    private void registerForMetadataUpdates() {
+        if (!isRegisteredForMetadataUpdates) {
+            this.metadata.addClusterUpdateListener(this);
+            isRegisteredForMetadataUpdates = true;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public CompletableFuture<Void> leaveGroup() {
+        if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+            // Member is not part of the group. No-op and return completed 
future to avoid
+            // unnecessary transitions.
+            return CompletableFuture.completedFuture(null);
+        }
+
+        if (state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING) {
+            // Member already leaving. No-op and return existing leave group 
future that will
+            // complete when the ongoing leave operation completes.
+            return leaveGroupInProgress.get();
+        }
+
+        transitionTo(MemberState.PREPARE_LEAVING);
+        leaveGroupInProgress = Optional.of(new CompletableFuture<>());
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
+
+        // Return future to indicate that the leave group is done when the 
callbacks
+        // complete, and the heartbeat to be sent out. (Best effort to send 
it, without waiting
+        // for a response or handling timeouts)
+        return leaveGroupInProgress.get();
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
-            transitionTo(MemberState.STABLE);
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
         } else {
-            transitionTo(MemberState.RECONCILING);
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+            // Remove all topic IDs and names from local cache
+            callbackResult.whenComplete((result, error) -> 
clearPendingAssignmentsAndLocalNamesCache());

Review Comment:
   Agree it was not consistent. I moved it close to the call to 
`assignPartitions`, so when callbacks complete we have a single 
`updateAssignment` that makes the assignment effective and clears cache if 
needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to