dajac commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1381676706


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -172,10 +223,14 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest() {
 
         membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
 
-        if (this.subscriptions.hasPatternSubscription()) {
-            // TODO: Pass the string to the GC if server side regex is used.
-        } else {
-            data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
+        // Send subscription to the broker only if it has changed
+        if (sendUpdatedSubscription) {

Review Comment:
   I think that this is not enough because we only need to send it if it has 
changed and we also need to re-send them on failure.
   
   I was thinking about introducing a stateful Builder object for the request 
which remembers the last fields sent out and decider whether the fields must be 
set or not. On errors, we could just reset the builder to re-send all fields.
   
   I think that could possibly always set all the fields in this PR and tackle 
this separately as we need to solve it more generally. What do you think?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##########
@@ -24,21 +24,34 @@
 public enum MemberState {
 
     /**
-     * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+     * Member is not part of the group. This could be the case when it has 
never joined (no call
+     * has been made to the subscribe API), or when the member intentionally 
leaves the group
+     * after a call to the unsubscribe API.
      */
-    UNJOINED,
+    NOT_IN_GROUP,
+
+    /**
+     * Member is attempting to join a consumer group. This could be the case 
when joining for the
+     * first time, or when it has been fenced and tries to re-join.
+     */

Review Comment:
   Could we extend the description to explain what we do in this state? I would 
also do it for the others.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -170,8 +242,11 @@ public void updateState(ConsumerGroupHeartbeatResponseData 
response) {
         ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
         if (assignment != null) {
             setTargetAssignment(assignment);
+            transitionTo(MemberState.RECONCILING);
+            reconcile(targetAssignment.get());

Review Comment:
   `targetAssignment` seems to be accessible directly. Do we really need to 
pass it here?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,33 +256,465 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = LEAVE_GROUP_EPOCH;
+        invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+        transitionTo(MemberState.FATAL);
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public void transitionToJoining() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * {@inheritDoc}
+     */
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.LEAVING);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
+    }
+
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
+        } else {
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 
that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = leaveGroupEpoch();
+        currentAssignment = new HashSet<>();
+        targetAssignment = Optional.empty();
+        transitionTo(MemberState.SENDING_LEAVE_REQUEST);
+    }
+
+    /**
+     * Return the epoch to use in the Heartbeat request to indicate that the 
member wants to
+     * leave the group. Should be -2 if this is a static member, or -1 in any 
other case.
+     */
+    private int leaveGroupEpoch() {
+        return groupInstanceId.isPresent() ? 
LEAVE_GROUP_EPOCH_FOR_STATIC_MEMBER : LEAVE_GROUP_EPOCH;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldHeartbeatNow() {
+        return state() == MemberState.SENDING_ACK_FOR_RECONCILED_ASSIGNMENT ||
+                state() == MemberState.SENDING_LEAVE_REQUEST;
+    }
+
+    /**
+     * {@inheritDoc}
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
+    @Override
+    public void onHeartbeatRequestSent() {
+        if (state() == MemberState.SENDING_ACK_FOR_RECONCILED_ASSIGNMENT) {
             transitionTo(MemberState.STABLE);
+        } else if (state() == MemberState.SENDING_LEAVE_REQUEST) {
+            transitionTo(MemberState.NOT_IN_GROUP);
+        }
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        return state() == MemberState.NOT_IN_GROUP || state() == 
MemberState.FATAL;
+    }
+
+    void reconcile(ConsumerGroupHeartbeatResponseData.Assignment 
targetAssignment) {
+
+        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<SortedSet<TopicPartition>> 
assignedPartitionsByNameResult = 
extractTopicPartitionsFromAssignment(targetAssignment);
+
+        assignedPartitionsByNameResult.whenComplete((assignedPartitions, 
metadataError) -> {
+            if (metadataError != null) {
+                log.error("Reconciliation failed due to error getting metadata 
to resolve topic " +
+                        "names for topic IDs {} in target assignment.", 
targetAssignment);
+                // TODO: failing reconciliation (no ack sent to the broker), 
but leaving
+                //  member in STABLE state. Double check if any other action 
should be taken
+                //  here.
+                transitionTo(MemberState.STABLE);
+                return;
+            }
+
+            if 
(!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
+                log.debug("Target assignment {} does not match the current 
subscription {}; it is " +
+                                "likely that the subscription has changed 
since we joined the group, " +
+                                "will re-join with current subscription",
+                        targetAssignment,
+                        subscriptions.prettyString());
+                transitionToJoining();
+                return;
+            }
+
+            // Partitions to assign (not previously owned)
+            SortedSet<TopicPartition> addedPartitions = new 
TreeSet<>(COMPARATOR);
+            addedPartitions.addAll(assignedPartitions);
+            addedPartitions.removeAll(ownedPartitions);
+

Review Comment:
   nit: Extra line.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,33 +256,465 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = LEAVE_GROUP_EPOCH;
+        invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+        transitionTo(MemberState.FATAL);
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public void transitionToJoining() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * {@inheritDoc}
+     */
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.LEAVING);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
+    }
+
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
+        } else {
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 
that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = leaveGroupEpoch();
+        currentAssignment = new HashSet<>();
+        targetAssignment = Optional.empty();
+        transitionTo(MemberState.SENDING_LEAVE_REQUEST);
+    }
+
+    /**
+     * Return the epoch to use in the Heartbeat request to indicate that the 
member wants to
+     * leave the group. Should be -2 if this is a static member, or -1 in any 
other case.
+     */
+    private int leaveGroupEpoch() {
+        return groupInstanceId.isPresent() ? 
LEAVE_GROUP_EPOCH_FOR_STATIC_MEMBER : LEAVE_GROUP_EPOCH;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldHeartbeatNow() {
+        return state() == MemberState.SENDING_ACK_FOR_RECONCILED_ASSIGNMENT ||
+                state() == MemberState.SENDING_LEAVE_REQUEST;
+    }
+
+    /**
+     * {@inheritDoc}
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
+    @Override
+    public void onHeartbeatRequestSent() {
+        if (state() == MemberState.SENDING_ACK_FOR_RECONCILED_ASSIGNMENT) {
             transitionTo(MemberState.STABLE);
+        } else if (state() == MemberState.SENDING_LEAVE_REQUEST) {
+            transitionTo(MemberState.NOT_IN_GROUP);
+        }
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        return state() == MemberState.NOT_IN_GROUP || state() == 
MemberState.FATAL;
+    }
+
+    void reconcile(ConsumerGroupHeartbeatResponseData.Assignment 
targetAssignment) {
+
+        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<SortedSet<TopicPartition>> 
assignedPartitionsByNameResult = 
extractTopicPartitionsFromAssignment(targetAssignment);
+
+        assignedPartitionsByNameResult.whenComplete((assignedPartitions, 
metadataError) -> {
+            if (metadataError != null) {
+                log.error("Reconciliation failed due to error getting metadata 
to resolve topic " +
+                        "names for topic IDs {} in target assignment.", 
targetAssignment);
+                // TODO: failing reconciliation (no ack sent to the broker), 
but leaving
+                //  member in STABLE state. Double check if any other action 
should be taken
+                //  here.
+                transitionTo(MemberState.STABLE);
+                return;
+            }
+
+            if 
(!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
+                log.debug("Target assignment {} does not match the current 
subscription {}; it is " +
+                                "likely that the subscription has changed 
since we joined the group, " +
+                                "will re-join with current subscription",
+                        targetAssignment,
+                        subscriptions.prettyString());
+                transitionToJoining();
+                return;
+            }
+
+            // Partitions to assign (not previously owned)
+            SortedSet<TopicPartition> addedPartitions = new 
TreeSet<>(COMPARATOR);
+            addedPartitions.addAll(assignedPartitions);
+            addedPartitions.removeAll(ownedPartitions);
+
+
+            // Partitions to revoke
+            SortedSet<TopicPartition> revokedPartitions = new 
TreeSet<>(COMPARATOR);
+            revokedPartitions.addAll(ownedPartitions);
+            revokedPartitions.removeAll(assignedPartitions);
+
+            log.info("Updating assignment with\n" +
+                            "\tAssigned partitions:                       
{}\n" +
+                            "\tCurrent owned partitions:                  
{}\n" +
+                            "\tAdded partitions (assigned - owned):       
{}\n" +
+                            "\tRevoked partitions (owned - assigned):     
{}\n",
+                    assignedPartitions,
+                    ownedPartitions,
+                    addedPartitions,
+                    revokedPartitions
+            );
+
+            CompletableFuture<Void> revocationResult;
+            if (!revokedPartitions.isEmpty()) {
+                revocationResult = revokePartitions(revokedPartitions);
+            } else {
+                revocationResult = CompletableFuture.completedFuture(null);
+                // Reschedule the auto commit starting from now (new 
assignment received without any
+                // revocation).
+                commitRequestManager.resetAutoCommitTimer();
+            }
+
+            // Future that will complete when the full reconciliation process 
completes (revocation
+            // and assignment, executed sequentially)
+            CompletableFuture<Void> reconciliationResult =
+                    revocationResult.thenCompose(r -> {
+                        if (state == MemberState.RECONCILING) {

Review Comment:
   I wonder if we should also check if the target assignment is still the same 
one. I am not sure if it is possible but could we have a callback coming really 
late and the state machine could have already transitioned to fenced and 
rejoined the group and got a new assignment so be in reconciling state again?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,33 +256,465 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = LEAVE_GROUP_EPOCH;
+        invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+        transitionTo(MemberState.FATAL);
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public void transitionToJoining() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * {@inheritDoc}
+     */
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.LEAVING);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
+    }
+
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
+        } else {
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 
that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = leaveGroupEpoch();
+        currentAssignment = new HashSet<>();
+        targetAssignment = Optional.empty();
+        transitionTo(MemberState.SENDING_LEAVE_REQUEST);
+    }
+
+    /**
+     * Return the epoch to use in the Heartbeat request to indicate that the 
member wants to
+     * leave the group. Should be -2 if this is a static member, or -1 in any 
other case.
+     */
+    private int leaveGroupEpoch() {
+        return groupInstanceId.isPresent() ? 
LEAVE_GROUP_EPOCH_FOR_STATIC_MEMBER : LEAVE_GROUP_EPOCH;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldHeartbeatNow() {
+        return state() == MemberState.SENDING_ACK_FOR_RECONCILED_ASSIGNMENT ||
+                state() == MemberState.SENDING_LEAVE_REQUEST;
+    }
+
+    /**
+     * {@inheritDoc}
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
+    @Override
+    public void onHeartbeatRequestSent() {
+        if (state() == MemberState.SENDING_ACK_FOR_RECONCILED_ASSIGNMENT) {
             transitionTo(MemberState.STABLE);
+        } else if (state() == MemberState.SENDING_LEAVE_REQUEST) {
+            transitionTo(MemberState.NOT_IN_GROUP);
+        }
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        return state() == MemberState.NOT_IN_GROUP || state() == 
MemberState.FATAL;
+    }
+
+    void reconcile(ConsumerGroupHeartbeatResponseData.Assignment 
targetAssignment) {
+
+        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<SortedSet<TopicPartition>> 
assignedPartitionsByNameResult = 
extractTopicPartitionsFromAssignment(targetAssignment);
+
+        assignedPartitionsByNameResult.whenComplete((assignedPartitions, 
metadataError) -> {
+            if (metadataError != null) {
+                log.error("Reconciliation failed due to error getting metadata 
to resolve topic " +
+                        "names for topic IDs {} in target assignment.", 
targetAssignment);
+                // TODO: failing reconciliation (no ack sent to the broker), 
but leaving
+                //  member in STABLE state. Double check if any other action 
should be taken
+                //  here.
+                transitionTo(MemberState.STABLE);

Review Comment:
   Okay. I think that we could get into this situations in two cases. 
   
   1. An assigned topic was just created and the metadata request got to a 
broker unaware of it yet. In this case, ignoring it means that the newly 
created topic will never be consumed by the member. Or, at least, it won't be 
consumed until another assignment is received. In the current implementation, I 
think that the fetcher will keep retrying on those topics. Ideally, we would 
need something similar here.
   
   2. An assigned topic was just deleted before the member got the chance to 
get the metadata. This is somewhat the opposite case.
   
   In the case of 1., we could argue that we should just keep retrying until it 
succeeds and it should eventually succeed. In this case of 2., it would never 
succeed if the topic is deleted so the member will never send an ack and will 
eventually be kicked out from the group. To make it worst, the member won't 
receive an new assignment without the deleted topic because the previous 
assignment is not ack'ed. The issue is that there is no way to differentiate 
the two cases.
   
   Ideally, we should set the subscription based on the topic ids instead of 
the topic names. However, this does not resolve the need to have the topic 
names for the callbacks. There are really annoying...
   
   Another thing that I wanted to point out is that it is not all or nothing. 
For instance, the member could get 10 partitions assigned to him and only one 
is unresolvable. 
   
   
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##########
@@ -24,21 +24,34 @@
 public enum MemberState {
 
     /**
-     * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+     * Member is not part of the group. This could be the case when it has 
never joined (no call
+     * has been made to the subscribe API), or when the member intentionally 
leaves the group
+     * after a call to the unsubscribe API.
      */
-    UNJOINED,
+    NOT_IN_GROUP,
+
+    /**
+     * Member is attempting to join a consumer group. This could be the case 
when joining for the
+     * first time, or when it has been fenced and tries to re-join.
+     */
+    JOINING,
 
     /**
      * Member has received a new target assignment (partitions could have been 
assigned or
      * revoked), and it is processing it. While in this state, the member will
      * invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and then make
      * the new assignment effective.
      */
-    // TODO: determine if separate state will be needed for assign/revoke (not 
for now)
     RECONCILING,
 
     /**
-     * Member is active in a group (heartbeating) and has processed all 
assignments received.
+     * Member has completed reconciling an assignment received, and stays in 
this state until the
+     * next heartbeat request is sent out to acknowledge the assignment to the 
server.
+     */
+    SENDING_ACK_FOR_RECONCILED_ASSIGNMENT,

Review Comment:
   I also wonder if we should call it `Acknowledging` to follow the naming of 
the other states. Thoughts?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##########
@@ -24,21 +24,34 @@
 public enum MemberState {
 
     /**
-     * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+     * Member is not part of the group. This could be the case when it has 
never joined (no call
+     * has been made to the subscribe API), or when the member intentionally 
leaves the group
+     * after a call to the unsubscribe API.
      */
-    UNJOINED,
+    NOT_IN_GROUP,

Review Comment:
   I am not a big fan of the `NOT_IN_GROUP` name because the consumer could 
still have a group id configured and commit offsets to a group. This is why I 
used `Unsubscribed` earlier. I wonder if we could find a better name... What do 
you think?



##########
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##########
@@ -80,7 +82,10 @@ private MetadataCache(String clusterId,
         this.invalidTopics = invalidTopics;
         this.internalTopics = internalTopics;
         this.controller = controller;
-        this.topicIds = topicIds;
+        this.topicIds = Collections.unmodifiableMap(topicIds);
+        this.topicNames = Collections.unmodifiableMap(
+                
topicIds.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, 
Map.Entry::getKey))

Review Comment:
   nit: I think that we tend to indent with 4 spaces in this case.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##########
@@ -48,26 +61,50 @@ public enum MemberState {
      * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} 
error from the
      * broker. This is a recoverable state, where the member
      * gives up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
-     * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+     * transitions to {@link #JOINING} to rejoin the group as a new member.
      */
     FENCED,
 
     /**
-     * The member failed with an unrecoverable error
+     * The member transitions to this state when it is leaving the group after 
a call to
+     * unsubscribe. It stays in this state while releasing its assignment 
(calling user's callback
+     * for partitions revoked or lost), until the callback completes and a 
heartbeat request is

Review Comment:
   Do we call `lost` in this case?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -77,32 +93,88 @@ public class MembershipManagerImpl implements 
MembershipManager {
     /**
      * Assignment that the member received from the server and successfully 
processed.
      */
-    private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+    private Set<TopicPartition> currentAssignment;
 
     /**
      * Assignment that the member received from the server but hasn't 
completely processed
      * yet.
      */
     private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
targetAssignment;
 
+    /**
+     * Subscription state object holding the current assignment the member has 
for the topics it
+     * subscribed to.
+     */
+    private final SubscriptionState subscriptions;
+
+    /**
+     * Metadata that allows us to create the partitions needed for {@link 
ConsumerRebalanceListener}.
+     */
+    private final ConsumerMetadata metadata;
+
+    /**
+     * TopicPartition comparator based on topic name and partition id.
+     */
+    private final static TopicPartitionComparator COMPARATOR = new 
TopicPartitionComparator();
+
     /**
      * Logger.
      */
     private final Logger log;
 
-    public MembershipManagerImpl(String groupId, LogContext logContext) {
-        this(groupId, null, null, logContext);
+    /**
+     * Manager to perform commit requests needed before revoking partitions 
(if auto-commit is
+     * enabled)
+     */
+    private final CommitRequestManager commitRequestManager;
+
+    /**
+     * Manager to perform metadata requests. Used to get topic metadata when 
needed for resolving
+     * topic names for topic IDs received in a target assignment.
+     */
+    private final TopicMetadataRequestManager metadataRequestManager;
+
+    /**
+     * Epoch that a member (not static) must include a heartbeat request to 
indicate that it wants
+     * to leave the group. This is considered as a definitive leave.
+     */
+    public static final int LEAVE_GROUP_EPOCH = -1;
+
+    /**
+     * Epoch that a static member (member with group instance id) must include 
a heartbeat request
+     * to indicate that it wants to leave the group. This will be considered 
as a potentially
+     * temporary leave.
+     */
+    public static final int LEAVE_GROUP_EPOCH_FOR_STATIC_MEMBER = -2;

Review Comment:
   I think that we already define them in the `ConsumerGroupHeartbeatRequest` 
class. We could reuse them.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##########
@@ -48,26 +61,50 @@ public enum MemberState {
      * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} 
error from the
      * broker. This is a recoverable state, where the member
      * gives up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
-     * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+     * transitions to {@link #JOINING} to rejoin the group as a new member.
      */
     FENCED,
 
     /**
-     * The member failed with an unrecoverable error
+     * The member transitions to this state when it is leaving the group after 
a call to
+     * unsubscribe. It stays in this state while releasing its assignment 
(calling user's callback
+     * for partitions revoked or lost), until the callback completes and a 
heartbeat request is
+     * sent out to effectively leave the group (without waiting for a 
response).
+     */
+    LEAVING_GROUP,
+
+    /**
+     * Member has completed releasing its assignment, and stays in this state 
until the next
+     * heartbeat request is sent out to leave the group.
      */
-    FAILED;
+    SENDING_LEAVE_REQUEST,

Review Comment:
   My understanding is that `LEAVING` do the pre-leaving steps (e.g. pause 
partitions, commit offsets, etc) while `SENDING_LEAVE_REQUEST` sends out the 
actually leave request.
   
   Perhaps, using `PREPARE_LEAVING` and `LEAVING` would make it clearer. 
Thoughts?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##########
@@ -24,21 +24,34 @@
 public enum MemberState {
 
     /**
-     * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+     * Member is not part of the group. This could be the case when it has 
never joined (no call
+     * has been made to the subscribe API), or when the member intentionally 
leaves the group
+     * after a call to the unsubscribe API.
      */
-    UNJOINED,
+    NOT_IN_GROUP,
+
+    /**
+     * Member is attempting to join a consumer group. This could be the case 
when joining for the
+     * first time, or when it has been fenced and tries to re-join.
+     */
+    JOINING,
 
     /**
      * Member has received a new target assignment (partitions could have been 
assigned or
      * revoked), and it is processing it. While in this state, the member will
      * invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and then make
      * the new assignment effective.
      */
-    // TODO: determine if separate state will be needed for assign/revoke (not 
for now)
     RECONCILING,
 
     /**
-     * Member is active in a group (heartbeating) and has processed all 
assignments received.
+     * Member has completed reconciling an assignment received, and stays in 
this state until the
+     * next heartbeat request is sent out to acknowledge the assignment to the 
server.
+     */
+    SENDING_ACK_FOR_RECONCILED_ASSIGNMENT,

Review Comment:
   So I I understand it correctly, the member transitions to this state as soon 
as the reconciliation is done and then transition to Stable as soon as the ack 
is sent out. Did I get it right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,33 +256,465 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = LEAVE_GROUP_EPOCH;
+        invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+        transitionTo(MemberState.FATAL);
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public void transitionToJoining() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * {@inheritDoc}
+     */
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.LEAVING);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
+    }
+
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
+        } else {
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 
that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = leaveGroupEpoch();
+        currentAssignment = new HashSet<>();
+        targetAssignment = Optional.empty();
+        transitionTo(MemberState.SENDING_LEAVE_REQUEST);
+    }
+
+    /**
+     * Return the epoch to use in the Heartbeat request to indicate that the 
member wants to
+     * leave the group. Should be -2 if this is a static member, or -1 in any 
other case.
+     */
+    private int leaveGroupEpoch() {
+        return groupInstanceId.isPresent() ? 
LEAVE_GROUP_EPOCH_FOR_STATIC_MEMBER : LEAVE_GROUP_EPOCH;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldHeartbeatNow() {
+        return state() == MemberState.SENDING_ACK_FOR_RECONCILED_ASSIGNMENT ||
+                state() == MemberState.SENDING_LEAVE_REQUEST;
+    }
+
+    /**
+     * {@inheritDoc}
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
+    @Override
+    public void onHeartbeatRequestSent() {
+        if (state() == MemberState.SENDING_ACK_FOR_RECONCILED_ASSIGNMENT) {
             transitionTo(MemberState.STABLE);
+        } else if (state() == MemberState.SENDING_LEAVE_REQUEST) {
+            transitionTo(MemberState.NOT_IN_GROUP);
+        }
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        return state() == MemberState.NOT_IN_GROUP || state() == 
MemberState.FATAL;
+    }
+
+    void reconcile(ConsumerGroupHeartbeatResponseData.Assignment 
targetAssignment) {
+
+        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<SortedSet<TopicPartition>> 
assignedPartitionsByNameResult = 
extractTopicPartitionsFromAssignment(targetAssignment);
+
+        assignedPartitionsByNameResult.whenComplete((assignedPartitions, 
metadataError) -> {
+            if (metadataError != null) {
+                log.error("Reconciliation failed due to error getting metadata 
to resolve topic " +
+                        "names for topic IDs {} in target assignment.", 
targetAssignment);
+                // TODO: failing reconciliation (no ack sent to the broker), 
but leaving
+                //  member in STABLE state. Double check if any other action 
should be taken
+                //  here.
+                transitionTo(MemberState.STABLE);
+                return;
+            }
+
+            if 
(!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
+                log.debug("Target assignment {} does not match the current 
subscription {}; it is " +
+                                "likely that the subscription has changed 
since we joined the group, " +
+                                "will re-join with current subscription",
+                        targetAssignment,
+                        subscriptions.prettyString());
+                transitionToJoining();
+                return;
+            }
+
+            // Partitions to assign (not previously owned)
+            SortedSet<TopicPartition> addedPartitions = new 
TreeSet<>(COMPARATOR);
+            addedPartitions.addAll(assignedPartitions);
+            addedPartitions.removeAll(ownedPartitions);
+
+
+            // Partitions to revoke
+            SortedSet<TopicPartition> revokedPartitions = new 
TreeSet<>(COMPARATOR);
+            revokedPartitions.addAll(ownedPartitions);
+            revokedPartitions.removeAll(assignedPartitions);
+
+            log.info("Updating assignment with\n" +
+                            "\tAssigned partitions:                       
{}\n" +
+                            "\tCurrent owned partitions:                  
{}\n" +
+                            "\tAdded partitions (assigned - owned):       
{}\n" +
+                            "\tRevoked partitions (owned - assigned):     
{}\n",
+                    assignedPartitions,
+                    ownedPartitions,
+                    addedPartitions,
+                    revokedPartitions
+            );
+
+            CompletableFuture<Void> revocationResult;
+            if (!revokedPartitions.isEmpty()) {
+                revocationResult = revokePartitions(revokedPartitions);
+            } else {
+                revocationResult = CompletableFuture.completedFuture(null);
+                // Reschedule the auto commit starting from now (new 
assignment received without any
+                // revocation).
+                commitRequestManager.resetAutoCommitTimer();
+            }
+
+            // Future that will complete when the full reconciliation process 
completes (revocation
+            // and assignment, executed sequentially)
+            CompletableFuture<Void> reconciliationResult =
+                    revocationResult.thenCompose(r -> {
+                        if (state == MemberState.RECONCILING) {
+                            // Make assignment effective on the client by 
updating the subscription state.
+                            
subscriptions.assignFromSubscribed(assignedPartitions);
+                            // Invoke user call back
+                            return 
invokeOnPartitionsAssignedCallback(addedPartitions);
+                        } else {
+                            // Revocation callback completed but member 
already moved out of the
+                            // reconciling state.
+                            CompletableFuture<Void> res = new 
CompletableFuture<>();
+                            res.completeExceptionally(new 
KafkaException("Interrupting " +
+                                    "reconciliation after revocation, as the 
member already " +
+                                    "transitioned out of the reconciling state 
into " + state));
+                            return res;
+                        }
+                    });
+
+            reconciliationResult.whenComplete((result, error) -> {
+                if (error != null) {
+                    // Leaving member in RECONCILING state after callbacks 
fail. The member
+                    // won't send the ack, and the expectation is that the 
broker will kick the
+                    // member out of the group after the rebalance timeout 
expires, leading to a
+                    // RECONCILING -> FENCED transition.
+                    log.error("Reconciliation failed. ", error);
+                } else {
+                    if (state == MemberState.RECONCILING) {
+
+                        // Make assignment effective on the broker by 
transitioning to send acknowledge.
+                        
transitionTo(MemberState.SENDING_ACK_FOR_RECONCILED_ASSIGNMENT);
+
+                        // Make assignment effective on the member group 
manager
+                        this.currentAssignment = assignedPartitions;
+                        this.targetAssignment = Optional.empty();
+
+                    } else {
+                        log.debug("New assignment processing completed but the 
member already " +
+                                "transitioned out of the reconciliation state 
into {}. Interrupting " +
+                                "reconciliation as it's not relevant 
anymore,", state);
+                        // TODO: double check if subscription state changes 
needed. This is expected to be
+                        //  the case where the member got fenced, failed or 
unsubscribed while the
+                        //  reconciliation was in process. Transitions to 
those states update the
+                        //  subscription state accordingly so it shouldn't be 
necessary to make any changes
+                        //  to the subscription state at this point.
+                    }
+                }
+            });
+        });
+    }
+
+    /**
+     * Build set of TopicPartition (topic name and partition id) from the 
assignment received
+     * from the broker (topic IDs and list of partitions). For each topic ID 
this will attempt to
+     * find the topic name in the metadata. If a topic ID is not found, this 
will request a
+     * metadata update, and the reconciliation will resume the topic metadata 
is received.
+     *
+     * @param assignment Assignment received from the broker, containing 
partitions grouped by
+     *                   topic id.
+     * @return Set of {@link TopicPartition} containing topic name and 
partition id.
+     */
+    private CompletableFuture<SortedSet<TopicPartition>> 
extractTopicPartitionsFromAssignment(
+            ConsumerGroupHeartbeatResponseData.Assignment assignment) {
+        SortedSet<TopicPartition> assignedPartitions = new 
TreeSet<>(COMPARATOR);
+
+        List<Uuid> topicsRequiringMetadata = new ArrayList<>();
+        assignment.topicPartitions().forEach(topicPartitions -> {
+            Uuid topicId = topicPartitions.topicId();
+            if (!metadata.topicNames().containsKey(topicId)) {
+                topicsRequiringMetadata.add(topicId);
+            } else {
+                String topicName = metadata.topicNames().get(topicId);
+                topicPartitions.partitions().forEach(tp -> 
assignedPartitions.add(new TopicPartition(topicName, tp)));
+            }
+
+        });
+
+        if (topicsRequiringMetadata.isEmpty()) {
+            return CompletableFuture.completedFuture(assignedPartitions);
         } else {
-            transitionTo(MemberState.RECONCILING);
+            return resolveTopicNamesForTopicIds(topicsRequiringMetadata, 
assignedPartitions);
+        }
+    }
+
+    /**
+     * Perform a topic metadata request to discover topic names for the given 
topic ids.
+     *
+     * @param topicsRequiringMetadata List of topic Uuid for which topic names 
are needed.
+     * @param resolvedTopicPartitions List of TopicPartitions for the topics 
with known names.
+     *                                This list will be extended when the 
missing topic names are
+     *                                received in metadata.
+     *
+     * @return Future that will complete when topic names are received for all
+     * topicsRequiringMetadata. It will fail if a metadata response is 
received but does not
+     * include all the topics that were requested.
+     */
+    private CompletableFuture<SortedSet<TopicPartition>> 
resolveTopicNamesForTopicIds(
+            List<Uuid> topicsRequiringMetadata,
+            SortedSet<TopicPartition> resolvedTopicPartitions) {
+        CompletableFuture<SortedSet<TopicPartition>> result = new 
CompletableFuture<>();
+        log.debug("Topic IDs {} received in assignment were not found in 
metadata. " +
+                "Requesting metadata to resolve topic names and proceed with 
the " +
+                "reconciliation.", topicsRequiringMetadata);
+        // TODO: request metadata only for the topics that require it. Passing 
empty list to
+        //  retrieve it for all topics until the TopicMetadataRequestManager 
supports a list
+        //  of topics.
+        CompletableFuture<Map<Topic, List<PartitionInfo>>> metadataResult = 
metadataRequestManager.requestTopicMetadata(Optional.empty());

Review Comment:
   One concern that I have with using the manager directly is that it does not 
seem to populate the metadata cache afterwards. So, we would resolve topics 
once here and then the fetcher would redo it because the metadata cache does 
not have the topics. This is not ideal.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -676,12 +676,12 @@ public ConsumerGroupMetadata groupMetadata() {
 
     @Override
     public void enforceRebalance() {
-        throw new KafkaException("method not implemented");
+        throw new UnsupportedOperationException("Operation not supported in 
new consumer group protocol");

Review Comment:
   From the KIP:
   > Consumer#enforceRebalance will be deprecated and will be a no-op if used 
when the new protocol is enable. A warning will be logged in this case.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,33 +242,465 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoinGroup();
+        });
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
     public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = -1;
+        invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+
+        transitionTo(MemberState.FATAL);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void transitionToJoinGroup() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.LEAVING);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
     }
 
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
+        } else {
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so 
that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = leaveGroupEpoch();
+        currentAssignment = new HashSet<>();
+        targetAssignment = Optional.empty();
+        transitionTo(MemberState.SENDING_LEAVE_REQUEST);
+    }
+
+    /**
+     * Return the epoch to use in the Heartbeat request to indicate that the 
member wants to
+     * leave the group. Should be -2 if this is a static member, or -1 in any 
other case.
+     */
+    private int leaveGroupEpoch() {
+        return groupInstanceId.isPresent() ? -2 : -1;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public boolean shouldHeartbeatNow() {
+        return state() == MemberState.SENDING_ACK_FOR_RECONCILED_ASSIGNMENT ||
+                state() == MemberState.SENDING_LEAVE_REQUEST;
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * {@inheritDoc}
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
+    @Override
+    public void onHeartbeatRequestSent() {
+        if (state() == MemberState.SENDING_ACK_FOR_RECONCILED_ASSIGNMENT) {
             transitionTo(MemberState.STABLE);
+        } else if (state() == MemberState.SENDING_LEAVE_REQUEST) {
+            transitionTo(MemberState.NOT_IN_GROUP);
+        }
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        return state() == MemberState.NOT_IN_GROUP || state() == 
MemberState.FATAL;
+    }
+
+    void reconcile(ConsumerGroupHeartbeatResponseData.Assignment 
targetAssignment) {
+
+        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<SortedSet<TopicPartition>> 
assignedPartitionsByNameResult = 
extractTopicPartitionsFromAssignment(targetAssignment);
+
+        assignedPartitionsByNameResult.whenComplete((assignedPartitions, 
metadataError) -> {
+            if (metadataError != null) {
+                log.error("Reconciliation failed due to error getting metadata 
to resolve topic " +
+                        "names for topic IDs {} in target assignment.", 
targetAssignment);
+                // TODO: failing reconciliation (no ack sent to the broker), 
but leaving
+                //  member in STABLE state. Double check if any other action 
should be taken
+                //  here.
+                transitionTo(MemberState.STABLE);
+                return;
+            }
+
+            if 
(!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {

Review Comment:
   As discussed offline, I would remove this. In my opinion, the member should 
just follow what the coordinator provide and should not try to be too smart 
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to