lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1390590490


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,33 +256,465 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = LEAVE_GROUP_EPOCH;
+        invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+        transitionTo(MemberState.FATAL);
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public void transitionToJoining() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * {@inheritDoc}
+     */
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.LEAVING);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
+    }
+
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
+        } else {
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 
that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = leaveGroupEpoch();
+        currentAssignment = new HashSet<>();
+        targetAssignment = Optional.empty();
+        transitionTo(MemberState.SENDING_LEAVE_REQUEST);
+    }
+
+    /**
+     * Return the epoch to use in the Heartbeat request to indicate that the 
member wants to
+     * leave the group. Should be -2 if this is a static member, or -1 in any 
other case.
+     */
+    private int leaveGroupEpoch() {
+        return groupInstanceId.isPresent() ? 
LEAVE_GROUP_EPOCH_FOR_STATIC_MEMBER : LEAVE_GROUP_EPOCH;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldHeartbeatNow() {
+        return state() == MemberState.SENDING_ACK_FOR_RECONCILED_ASSIGNMENT ||
+                state() == MemberState.SENDING_LEAVE_REQUEST;
+    }
+
+    /**
+     * {@inheritDoc}
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
+    @Override
+    public void onHeartbeatRequestSent() {
+        if (state() == MemberState.SENDING_ACK_FOR_RECONCILED_ASSIGNMENT) {
             transitionTo(MemberState.STABLE);
+        } else if (state() == MemberState.SENDING_LEAVE_REQUEST) {
+            transitionTo(MemberState.NOT_IN_GROUP);
+        }
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        return state() == MemberState.NOT_IN_GROUP || state() == 
MemberState.FATAL;
+    }
+
+    void reconcile(ConsumerGroupHeartbeatResponseData.Assignment 
targetAssignment) {
+
+        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<SortedSet<TopicPartition>> 
assignedPartitionsByNameResult = 
extractTopicPartitionsFromAssignment(targetAssignment);
+
+        assignedPartitionsByNameResult.whenComplete((assignedPartitions, 
metadataError) -> {
+            if (metadataError != null) {
+                log.error("Reconciliation failed due to error getting metadata 
to resolve topic " +
+                        "names for topic IDs {} in target assignment.", 
targetAssignment);
+                // TODO: failing reconciliation (no ack sent to the broker), 
but leaving
+                //  member in STABLE state. Double check if any other action 
should be taken
+                //  here.
+                transitionTo(MemberState.STABLE);
+                return;
+            }
+
+            if 
(!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
+                log.debug("Target assignment {} does not match the current 
subscription {}; it is " +
+                                "likely that the subscription has changed 
since we joined the group, " +
+                                "will re-join with current subscription",
+                        targetAssignment,
+                        subscriptions.prettyString());
+                transitionToJoining();
+                return;
+            }
+
+            // Partitions to assign (not previously owned)
+            SortedSet<TopicPartition> addedPartitions = new 
TreeSet<>(COMPARATOR);
+            addedPartitions.addAll(assignedPartitions);
+            addedPartitions.removeAll(ownedPartitions);
+
+
+            // Partitions to revoke
+            SortedSet<TopicPartition> revokedPartitions = new 
TreeSet<>(COMPARATOR);
+            revokedPartitions.addAll(ownedPartitions);
+            revokedPartitions.removeAll(assignedPartitions);
+
+            log.info("Updating assignment with\n" +
+                            "\tAssigned partitions:                       
{}\n" +
+                            "\tCurrent owned partitions:                  
{}\n" +
+                            "\tAdded partitions (assigned - owned):       
{}\n" +
+                            "\tRevoked partitions (owned - assigned):     
{}\n",
+                    assignedPartitions,
+                    ownedPartitions,
+                    addedPartitions,
+                    revokedPartitions
+            );
+
+            CompletableFuture<Void> revocationResult;
+            if (!revokedPartitions.isEmpty()) {
+                revocationResult = revokePartitions(revokedPartitions);
+            } else {
+                revocationResult = CompletableFuture.completedFuture(null);
+                // Reschedule the auto commit starting from now (new 
assignment received without any
+                // revocation).
+                commitRequestManager.resetAutoCommitTimer();
+            }
+
+            // Future that will complete when the full reconciliation process 
completes (revocation
+            // and assignment, executed sequentially)
+            CompletableFuture<Void> reconciliationResult =
+                    revocationResult.thenCompose(r -> {
+                        if (state == MemberState.RECONCILING) {

Review Comment:
   Totally valid point. I fixed it, but comparing assignments seemed more 
complicated and harder to reason about with the current approach. Now the 
target assignment is kind of a moving target, it can be modified anytime not 
only from the server, but also from metadata updates. 
   
   So back to the problem of making sure that delayed reconciliations are not 
applied after a member rejoins, I added just a check based on the current 
member ID, to identify that a reconciliation completed but the member is 
already re-joining. What do you think? (whenever there are no rejoins in the 
picture, I expect that the RECONCILING state check alone should be enough given 
that reconciliations are always applied sequentially)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to