kirktrue commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1391862343


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -1093,6 +1099,10 @@ private void subscribeInternal(Collection<String> 
topics, Optional<ConsumerRebal
             log.info("Subscribed to topic(s): {}", join(topics, ", "));
             if (subscriptions.subscribe(new HashSet<>(topics), listener))
                 metadata.requestUpdateForNewTopics();
+
+            // Trigger subscribe event to effectively join the group if not 
already part of it,
+            // or just send the new subscription to the broker.
+            applicationEventHandler.add(new 
SubscriptionChangeApplicationEvent());

Review Comment:
   There's another `subscribeInternal()` for the topic pattern path. We want 
this there too, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -676,12 +678,12 @@ public ConsumerGroupMetadata groupMetadata() {
 
     @Override
     public void enforceRebalance() {
-        throw new KafkaException("method not implemented");

Review Comment:
   Good call!



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+/**
+ * Application event triggered when a user calls the unsubscribe API. This 
will make the consumer
+ * release all its assignments and send a heartbeat request to leave the 
consumer group.
+ * This event holds a future that will complete when the invocation of 
callbacks to release
+ * complete and the heartbeat to leave the group is sent out (minimal effort 
to send the
+ * leave group heartbeat, without waiting for any response or considering 
timeouts).
+ */
+public class UnsubscribeApplicationEvent extends 
CompletableApplicationEvent<Void> {

Review Comment:
   The intention of the `CompleteableApplicationEvent` was to have a way for 
the consumer to block on the results of operations performed in the background 
thread. Since the `Consumer.unsubscribe()` API call is non-blocking, I'm 
thinking this should be a subclass of `ApplicationEvent`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##########
@@ -25,7 +25,8 @@ public abstract class ApplicationEvent {
 
     public enum Type {
         COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
-        LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA
+        LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, 
SUBSCRIPTION_CHANGE,

Review Comment:
   ```suggestion
           LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, 
SUBSCRIBED,
   ```
   
   `SUBSCRIPTION_CHANGE` is a bit vague. Does it encompass more than the event 
of the user calling `Consumer.subscribe()`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +347,537 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+        clearPendingAssignmentsAndLocalNamesCache();
+        transitionTo(MemberState.FATAL);
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public void transitionToJoining() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+        clearPendingAssignmentsAndLocalNamesCache();
+        // Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+        // reconciliation completes while the member is rejoining but hasn't 
received the new
+        // member ID yet, the reconciliation result is discarded.
+        memberIdOnReconciliationStart = null;
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * {@inheritDoc}
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
-            transitionTo(MemberState.STABLE);
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.PREPARE_LEAVING);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
+    }
+
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
         } else {
-            transitionTo(MemberState.RECONCILING);
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+            // Remove all topic IDs and names from local cache
+            callbackResult.whenComplete((result, error) -> 
clearPendingAssignmentsAndLocalNamesCache());
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#LEAVING} state so that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        currentAssignment = new HashSet<>();
+        transitionTo(MemberState.LEAVING);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldHeartbeatNow() {
+        MemberState state = state();
+        return state == MemberState.ACKNOWLEDGING || state == 
MemberState.LEAVING;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onHeartbeatRequestSent() {
+        MemberState state = state();
+        if (state == MemberState.ACKNOWLEDGING) {
+            if (allPendingAssignmentsReconciled()) {
+                transitionTo(MemberState.STABLE);
+            } else {
+                transitionTo(MemberState.RECONCILING);
+            }
+        } else if (state == MemberState.LEAVING) {
+            transitionTo(MemberState.UNSUBSCRIBED);
         }
-        return state.equals(MemberState.STABLE);
     }
 
     /**
-     * Take new target assignment received from the server and set it as 
targetAssignment to be
-     * processed. Following the consumer group protocol, the server won't send 
a new target
-     * member while a previous one hasn't been acknowledged by the member, so 
this will fail
-     * if a target assignment already exists.
+     * @return True if there are no assignments waiting to be resolved from 
metadata or reconciled.
+     */
+    private boolean allPendingAssignmentsReconciled() {
+        return assignmentUnresolved.isEmpty() && 
assignmentReadyToReconcile.isEmpty();
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        MemberState state = state();
+        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
+    }
+
+    /**
+     * Reconcile the assignment that has been received from the server and for 
which topic names
+     * are resolved, kept in the {@link #assignmentReadyToReconcile}. This 
will commit if needed,
+     * trigger the callbacks and update the subscription state. Note that only 
one reconciliation
+     * can be in progress at a time. If there is already another one in 
progress when this is
+     * triggered, it will be no-op, and the assignment will be reconciled on 
the next
+     * reconciliation loop.
+     */
+    boolean reconcile() {
+        // Make copy of the assignment to reconcile as it could change as new 
assignments or metadata updates are received
+        SortedSet<TopicPartition> assignedPartitions = new 
TreeSet<>(assignmentReadyToReconcile);
+        boolean sameAssignmentReceived = 
assignedPartitions.equals(subscriptions.assignedPartitions());
+
+        if (reconciliationInProgress || sameAssignmentReceived) {
+            String reason;
+            if (reconciliationInProgress) {
+                reason = "Another reconciliation is already in progress. 
Assignment " +
+                        assignmentReadyToReconcile + " will be handled in the 
next reconciliation loop.";
+            } else {
+                reason = "Target assignment ready to reconcile is equals to 
the member current assignment.";
+            }
+            log.debug("Ignoring reconciliation attempt." + reason);
+            return false;
+        }
+
+        markReconciliationInProgress();
+
+        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+
+        // Partitions to assign (not previously owned)
+        SortedSet<TopicPartition> addedPartitions = new TreeSet<>(COMPARATOR);
+        addedPartitions.addAll(assignedPartitions);
+        addedPartitions.removeAll(ownedPartitions);
+
+        // Partitions to revoke
+        SortedSet<TopicPartition> revokedPartitions = new 
TreeSet<>(COMPARATOR);
+        revokedPartitions.addAll(ownedPartitions);
+        revokedPartitions.removeAll(assignedPartitions);
+
+        log.info("Updating assignment with\n" +
+                        "\tAssigned partitions:                       {}\n" +
+                        "\tCurrent owned partitions:                  {}\n" +
+                        "\tAdded partitions (assigned - owned):       {}\n" +
+                        "\tRevoked partitions (owned - assigned):     {}\n",
+                assignedPartitions,
+                ownedPartitions,
+                addedPartitions,
+                revokedPartitions
+        );
+
+        CompletableFuture<Void> revocationResult;
+        if (!revokedPartitions.isEmpty()) {
+            revocationResult = revokePartitions(revokedPartitions);
+        } else {
+            revocationResult = CompletableFuture.completedFuture(null);
+            // Reschedule the auto commit starting from now (new assignment 
received without any
+            // revocation).
+            commitRequestManager.resetAutoCommitTimer();
+        }
+
+        // Future that will complete when the full reconciliation process 
completes (revocation
+        // and assignment, executed sequentially)
+        CompletableFuture<Void> reconciliationResult =
+                revocationResult.thenCompose(r -> {
+                    boolean memberHasRejoined = 
!Objects.equals(memberIdOnReconciliationStart,
+                            memberId);
+                    if (state == MemberState.RECONCILING && 
!memberHasRejoined) {
+
+                        // Make assignment effective on the client by updating 
the subscription state.
+                        subscriptions.assignFromSubscribed(assignedPartitions);
+
+                        // Clear topic names cache only for topics that are 
not in the subscription anymore
+                        for (TopicPartition tp : revokedPartitions) {
+                            if 
(!subscriptions.subscription().contains(tp.topic())) {
+                                
assignedTopicNamesCache.values().remove(tp.topic());
+                            }
+                        }
+
+                        // Invoke user call back
+                        return 
invokeOnPartitionsAssignedCallback(addedPartitions);
+
+                    } else {
+                        String reason;
+                        if (state != MemberState.RECONCILING) {
+                            reason = "The member already transitioned out of 
the reconciling " +
+                                    "state into " + state;
+                        } else {
+                            reason = "The member has re-joined the group.";
+                        }
+                        // Revocation callback completed but the reconciled 
assignment should not
+                        // be applied (not relevant anymore). This could be 
because the member
+                        // is not in the RECONCILING state anymore (fenced, 
failed, unsubscribed),
+                        // or because it has already re-joined the group.
+                        CompletableFuture<Void> res = new 
CompletableFuture<>();
+                        res.completeExceptionally(new 
KafkaException("Interrupting reconciliation" +
+                                " after revocation. " + reason));
+                        return res;
+                    }
+                });
+
+        reconciliationResult.whenComplete((result, error) -> {
+            markReconciliationCompleted();
+            if (error != null) {
+                // Leaving member in RECONCILING state after callbacks fail. 
The member
+                // won't send the ack, and the expectation is that the broker 
will kick the
+                // member out of the group after the rebalance timeout 
expires, leading to a
+                // RECONCILING -> FENCED transition.
+                log.error("Reconciliation failed. ", error);
+            } else {
+                if (state == MemberState.RECONCILING) {
+
+                    // Make assignment effective on the broker by 
transitioning to send acknowledge.
+                    transitionTo(MemberState.ACKNOWLEDGING);
+
+                    // Make assignment effective on the member group manager
+                    currentAssignment = assignedPartitions;
+
+                    // Indicate that we completed reconciling a subset of the 
assignment ready to
+                    // reconcile (new assignments might have been received or 
discovered in
+                    // metadata)
+                    assignmentReadyToReconcile.removeAll(assignedPartitions);
+
+                } else {
+                    log.debug("New assignment processing completed but the 
member already " +
+                            "transitioned out of the reconciliation state into 
{}. Interrupting " +
+                            "reconciliation as it's not relevant anymore,", 
state);
+                    // TODO: double check if subscription state changes 
needed. This is expected to be
+                    //  the case where the member got fenced, failed or 
unsubscribed while the
+                    //  reconciliation was in process. Transitions to those 
states update the
+                    //  subscription state accordingly so it shouldn't be 
necessary to make any changes
+                    //  to the subscription state at this point.
+                }
+            }
+        });
+
+        return true;
+    }
+
+    /**
+     *  Visible for testing.
+     */
+    void markReconciliationInProgress() {
+        reconciliationInProgress = true;
+        memberIdOnReconciliationStart = memberId;
+    }
+
+    /**
+     *  Visible for testing.
+     */
+    void markReconciliationCompleted() {
+        reconciliationInProgress = false;
+    }
+
+    /**
+     * Build set of TopicPartition (topic name and partition id) from the 
target assignment
+     * received from the broker (topic IDs and list of partitions).
      *
-     * @throws IllegalStateException If a target assignment already exists.
+     * <p>
+     * This will:
+     *
+     * <ol type="1">
+     *     <li>Try to find topic names in the metadata cache</li>
+     *     <li>For topics not found in metadata, try to find names in the 
local topic names cache
+     *     (contains topic id and names currently assigned and resolved)</li>
+     *     <li>If there are topics that are not in metadata cache or in the 
local cached
+     *     of topic names assigned to this member, request a metadata update, 
and continue
+     *     resolving names as the cache is updated.
+     *     </li>
+     * </ol>
+     */
+    private void resolveMetadataForUnresolvedAssignment() {
+
+        // Try to resolve topic names from metadata cache or subscription 
cache, and move
+        // assignments from the "unresolved" collection, to the 
"readyToReconcile" one.
+        Iterator<Map.Entry<Uuid, List<Integer>>> it = 
assignmentUnresolved.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<Uuid, List<Integer>> e = it.next();
+            Uuid topicId = e.getKey();
+            List<Integer> topicPartitions = e.getValue();
+
+            Optional<String> nameFromMetadata = 
findTopicNameInGlobalOrLocalCache(topicId);
+            if (nameFromMetadata.isPresent()) {
+                // Name resolved, so assignment is ready for reconciliation.
+                
assignmentReadyToReconcile.addAll(buildAssignedPartitionsWithTopicName(topicPartitions,
+                        nameFromMetadata.get()));
+                it.remove();
+            }
+        }
+
+        if (!assignmentUnresolved.isEmpty()) {
+            log.debug("Topic Ids {} received in target assignment were not 
found in metadata and " +
+                    "are not currently assigned. Requesting a metadata update 
now to resolve " +
+                    "topic names.", assignmentUnresolved.keySet());
+            metadata.requestUpdate(true);
+        }
+    }
+
+    /**
+     * Look for topic in the global metadata cache. If found, add it to the 
local cache and
+     * return it. If not found, look for it in the local metadata cache. 
Return empty if not
+     * found in any of the two.
      */
-    private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
-        if (!targetAssignment.isPresent()) {
-            log.info("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
-            targetAssignment = Optional.of(newTargetAssignment);
+    private Optional<String> findTopicNameInGlobalOrLocalCache(Uuid topicId) {
+        String nameFromMetadataCache = 
metadata.topicNames().getOrDefault(topicId, null);
+        if (nameFromMetadataCache != null) {
+            // Add topic name to local cache, so it can be reused if included 
in a next target
+            // assignment if metadata cache not available.
+            assignedTopicNamesCache.put(topicId, nameFromMetadataCache);
+            return Optional.of(nameFromMetadataCache);
         } else {
-            transitionToFailed();
-            throw new IllegalStateException("Cannot set new target assignment 
because a " +
-                    "previous one pending to be reconciled already exists.");
+            // Topic ID was not found in metadata. Check if the topic name is 
in the local
+            // cache of topics currently assigned. This will avoid a metadata 
request in the
+            // case where the metadata cache may have been flushed right 
before the
+            // revocation of a previously assigned topic.
+            String nameFromSubscriptionCache = 
assignedTopicNamesCache.getOrDefault(topicId, null);
+            return Optional.ofNullable(nameFromSubscriptionCache);
+

Review Comment:
   ```suggestion
   ```
   
   nit: remove extra newline.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -77,32 +138,110 @@ public class MembershipManagerImpl implements 
MembershipManager {
     /**
      * Assignment that the member received from the server and successfully 
processed.
      */
-    private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+    private Set<TopicPartition> currentAssignment;
 
     /**
-     * Assignment that the member received from the server but hasn't 
completely processed
-     * yet.
+     * Subscription state object holding the current assignment the member has 
for the topics it
+     * subscribed to.
      */
-    private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
targetAssignment;
+    private final SubscriptionState subscriptions;
+
+    /**
+     * Metadata that allows us to create the partitions needed for {@link 
ConsumerRebalanceListener}.
+     */
+    private final ConsumerMetadata metadata;
+
+    /**
+     * TopicPartition comparator based on topic name and partition id.
+     */
+    private final static TopicPartitionComparator COMPARATOR = new 
TopicPartitionComparator();
 
     /**
      * Logger.
      */
     private final Logger log;
 
-    public MembershipManagerImpl(String groupId, LogContext logContext) {
-        this(groupId, null, null, logContext);
+    /**
+     * Manager to perform commit requests needed before revoking partitions 
(if auto-commit is
+     * enabled)
+     */
+    private final CommitRequestManager commitRequestManager;
+
+    /**
+     * Local cache of assigned topic IDs and names. Topics are added here when 
received in a
+     * target assignment, as we discover their names in the Metadata cache, 
and removed when the
+     * topic is not in the subscription anymore. The purpose of this cache is 
to avoid metadata
+     * requests in cases where a currently assigned topic is in the target 
assignment (new
+     * partition assigned, or revoked), but it is not present the Metadata 
cache at that moment.
+     * The cache is cleared when the subscription changes ({@link 
#transitionToJoining()}, the
+     * member fails ({@link #transitionToFatal()} or leaves the group ({@link 
#leaveGroup()}).
+     */
+    private final Map<Uuid, String> assignedTopicNamesCache;
+
+    /**
+     * Topic IDs received in a target assignment for which we haven't found 
topic names yet.
+     * Items are added to this set every time a target assignment is received. 
Items are removed
+     * when metadata is found for the topic. This is where the member collects 
all assignments
+     * received from the broker, even though they may not be ready to 
reconcile due to missing
+     * metadata.
+     */
+    private final Map<Uuid, List<Integer>> assignmentUnresolved;
+
+    /**
+     * Assignment received for which topic names have been resolved, so it's 
ready to be
+     * reconciled. Items are added to this set when received in a target 
assignment (if metadata
+     * available), or when a metadata update is received. This is where the 
member keeps all the
+     * assignment ready to reconcile, even though the reconciliation might 
need to wait if there
+     * is already another on in process.
+     */
+    private final SortedSet<TopicPartition> assignmentReadyToReconcile;
+
+    /**
+     * Epoch that a member must include a heartbeat request to indicate that 
it want to join or
+     * re-join a group.
+     */
+    public static final int JOIN_GROUP_EPOCH = 0;
+
+    /**
+     * If there is a reconciliation running (triggering commit, callbacks) for 
the
+     * assignmentReadyToReconcile. This will be true if {@link #reconcile()} 
has been triggered
+     * after receiving a heartbeat response, or a metadata update.
+     */
+    private boolean reconciliationInProgress;
+
+    /**
+     * ID the member had when the reconciliation in progress started. This is 
used to identify if
+     * the member has rejoined while it was reconciling an assignment (in 
which case the result
+     * of the reconciliation is not applied.)
+     */
+    private String memberIdOnReconciliationStart;
+
+

Review Comment:
   ```suggestion
   
   ```
   
   Haha. I don't really care 😏



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -260,42 +900,52 @@ public Optional<String> serverAssignor() {
      * {@inheritDoc}
      */
     @Override
-    public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
+    public Set<TopicPartition> currentAssignment() {
         return this.currentAssignment;
     }
 
 
     /**
-     * @return Assignment that the member received from the server but hasn't 
completely processed
-     * yet. Visible for testing.
+     * @return Set of topic IDs received in a target assignment that have not 
been reconciled yet
+     * because topic names are not in metadata. Visible for testing.
      */
-    Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment() 
{
-        return targetAssignment;
+    Set<Uuid> topicsWaitingForMetadata() {
+        return Collections.unmodifiableSet(assignmentUnresolved.keySet());
     }
 
     /**
-     * This indicates that the reconciliation of the target assignment has 
been successfully
-     * completed, so it will make it effective by assigning it to the current 
assignment.
-     *
-     * @params Assignment that has been successfully reconciled. This is 
expected to
-     * match the target assignment defined in {@link #targetAssignment()}
+     * @return Topic partitions received in a target assignment that have been 
resolved in
+     * metadata and are ready to be reconciled. Visible for testing.
+     */
+    Set<TopicPartition> assignmentReadyToReconcile() {
+        return Collections.unmodifiableSet(assignmentReadyToReconcile);
+    }
+
+    /**
+     * @return If there is a reconciliation in process now. Note that 
reconciliation is triggered
+     * by a call to {@link #reconcile()}. Visible for testing.
+     */
+    boolean reconciliationInProgress() {
+        return reconciliationInProgress;
+    }
+
+    /**
+     * When cluster metadata is updated, try to resolve topic names for topic 
IDs received in
+     * assignment that hasn't been resolved yet.
+     * <ul>
+     *     <li>Try to find topic names for all unresolved assignments</li>
+     *     <li>Add discovered topic names to the local topic names cache</li>
+     *     <li>If any topics are resolved, trigger a reconciliation 
process</li>
+     *     <li>If some topics still remain unresolved, request another 
metadata update</li>
+     * </ul>
      */
     @Override
-    public void 
onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-        if (assignment == null) {
-            throw new IllegalArgumentException("Assignment cannot be null");
-        }
-        if (!assignment.equals(targetAssignment.orElse(null))) {
-            // This could be simplified to remove the assignment param and 
just assume that what
-            // was reconciled was the targetAssignment, but keeping it 
explicit and failing fast
-            // here to uncover any issues in the interaction of the assignment 
processing logic
-            // and this.
-            throw new IllegalStateException(String.format("Reconciled 
assignment %s does not " +
-                            "match the expected target assignment %s", 
assignment,
-                    targetAssignment.orElse(null)));
+    public void onUpdate(ClusterResource clusterResource) {
+        resolveMetadataForUnresolvedAssignment();
+        if (!assignmentReadyToReconcile.isEmpty()) {
+            // TODO: improve reconciliation triggering. Initial approach of 
triggering on every
+            //  HB response and metadata update.
+            reconcile();
         }

Review Comment:
   I apologize if it's here somewhere, but I don't see where we "register" the 
membership manager with the cluster resource listeners.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##########
@@ -17,57 +17,111 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.protocol.Errors;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 public enum MemberState {
 
     /**
-     * Member has not joined a consumer group yet, or has been fenced and 
needs to re-join.
+     * Member has a group id, but it is not subscribed to any topic to receive 
automatic
+     * assignments. This will be the state when the member has never 
subscribed, or when it has
+     * unsubscribed from all topics. While in this state the member can commit 
offsets but won't
+     * be an active member of the consumer group (no heartbeats sent).
      */
-    UNJOINED,
+    UNSUBSCRIBED,
+
+    /**
+     * Member is attempting to join a consumer group. While in this state, the 
member will send
+     * heartbeat requests on the interval, with epoch 0, until it gets a 
response with an epoch > 0
+     * or a fatal failure. A member transitions to this state when it tries to 
join the group for
+     * the first time with a call to subscribe, or when it has been fenced and 
tries to re-join.
+     */
+    JOINING,
 
     /**
      * Member has received a new target assignment (partitions could have been 
assigned or
-     * revoked), and it is processing it. While in this state, the member will
-     * invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and then make
-     * the new assignment effective.
+     * revoked), and it is processing it. While in this state, the member will 
continue to send
+     * heartbeat on the interval, and reconcile the assignment (it will commit 
offsets if
+     * needed, invoke the user callbacks for onPartitionsAssigned or 
onPartitionsRevoked, and make
+     * the new assignment effective). Note that while in this state the member 
may be trying to
+     * resolve metadata for the target assignment, or triggering 
commits/callbacks if topic names
+     * already resolved.
      */
-    // TODO: determine if separate state will be needed for assign/revoke (not 
for now)
     RECONCILING,
 
     /**
-     * Member is active in a group (heartbeating) and has processed all 
assignments received.
+     * Member has completed reconciling an assignment received, and stays in 
this state only until
+     * the next heartbeat request is sent out to acknowledge the assignment to 
the server. This
+     * state indicates that the next heartbeat request must be sent without 
waiting for the
+     * heartbeat interval to expire. Note that once the ack is sent, the 
member could go back to
+     * {@link #RECONCILING} if it still has assignment waiting to be 
reconciled (assignments
+     * waiting for metadata, assignments for which metadata was resolved, or 
new assignments
+     * received from the broker)
+     */
+    ACKNOWLEDGING,
+
+    /**
+     * Member is active in a group and has processed all assignments received. 
While in this
+     * state, the member will send heartbeats on the interval.
      */
     STABLE,
 
     /**
-     * Member transitions to this state when it receives a
-     * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or
-     * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} 
error from the
-     * broker. This is a recoverable state, where the member
-     * gives up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
-     * transitions to {@link #UNJOINED} to rejoin the group as a new member.
+     * Member transitions to this state when it receives a {@link 
Errors#UNKNOWN_MEMBER_ID} or
+     * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating 
that it has been
+     * left out of the group. While in this state, the member will stop 
sending heartbeats, it
+     * will give up its partitions by invoking the user callbacks for 
onPartitionsLost, and then
+     * transition to {@link #JOINING} to re-join the group as a new member.
      */
     FENCED,
 
     /**
-     * The member failed with an unrecoverable error
+     * The member transitions to this state after a call to unsubscribe. While 
in this state, the
+     * member will stop sending heartbeats, will commit offsets if needed and 
release its
+     * assignment (calling user's callback for partitions revoked or lost). 
When all these
+     * actions complete, the member will transition out of this state into 
{@link #LEAVING} to
+     * effectively leave the group.
+     */
+    PREPARE_LEAVING,
+
+    /**
+     * Member has committed offsets and releases its assignment, so it stays 
in this state until
+     * the next heartbeat request is sent out with epoch -1 to effectively 
leave the group. This
+     * state indicates that the next heartbeat request must be sent without 
waiting for the
+     * heartbeat interval to expire.
      */
-    FAILED;
+    LEAVING,
 
+    /**
+     * The member failed with an unrecoverable error received in a heartbeat 
response. This in an
+     * unrecoverable state where the member won't send any requests to the 
broker and cannot
+     * perform any other transition.
+     */
+    FATAL;
+
+    /**
+     * Valid state transitions
+     */
     static {
-        // Valid state transitions
-        STABLE.previousValidStates = Arrays.asList(UNJOINED, RECONCILING);
 
-        RECONCILING.previousValidStates = Arrays.asList(STABLE, UNJOINED);
+        STABLE.previousValidStates = Arrays.asList(JOINING, ACKNOWLEDGING);
+
+        RECONCILING.previousValidStates = Arrays.asList(STABLE, JOINING, 
ACKNOWLEDGING);
+
+        ACKNOWLEDGING.previousValidStates = Arrays.asList(RECONCILING);
+
+        FATAL.previousValidStates = Arrays.asList(JOINING, STABLE, 
RECONCILING, ACKNOWLEDGING);
+
+        FENCED.previousValidStates = Arrays.asList(JOINING, STABLE, 
RECONCILING, ACKNOWLEDGING);
 
-        FAILED.previousValidStates = Arrays.asList(UNJOINED, STABLE, 
RECONCILING);
+        JOINING.previousValidStates = Arrays.asList(FENCED, UNSUBSCRIBED);
 
-        FENCED.previousValidStates = Arrays.asList(STABLE, RECONCILING);
+        PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, 
RECONCILING,
+                ACKNOWLEDGING, UNSUBSCRIBED, FENCED);
 
-        UNJOINED.previousValidStates = Arrays.asList(FENCED);
+        LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING);
     }

Review Comment:
   Are we missing the initialization of `UNSUBSCRIBED`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -843,7 +845,11 @@ private void updatePatternSubscription(Cluster cluster) {
     @Override
     public void unsubscribe() {
         fetchBuffer.retainAll(Collections.emptySet());
-        subscriptions.unsubscribe();
+        UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
+        applicationEventHandler.add(unsubscribeApplicationEvent);
+        unsubscribeApplicationEvent.future().whenComplete((result, error) -> {
+            subscriptions.unsubscribe();
+        });

Review Comment:
   The closure that the application thread is passing to `whenComplete()` will 
be run in the background thread, right? The closure is modifying the 
`SubscriptionState`, it shouldn't cause any problems, but still...
   
   Since we have access to the `SubscriptionState` in the background thread 
already, can the background thread just update the `SubscriptionState` directly?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +347,537 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+        clearPendingAssignmentsAndLocalNamesCache();
+        transitionTo(MemberState.FATAL);
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public void transitionToJoining() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+        clearPendingAssignmentsAndLocalNamesCache();
+        // Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+        // reconciliation completes while the member is rejoining but hasn't 
received the new
+        // member ID yet, the reconciliation result is discarded.
+        memberIdOnReconciliationStart = null;
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * {@inheritDoc}
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
-            transitionTo(MemberState.STABLE);
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.PREPARE_LEAVING);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
+    }
+
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
         } else {
-            transitionTo(MemberState.RECONCILING);
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+            // Remove all topic IDs and names from local cache
+            callbackResult.whenComplete((result, error) -> 
clearPendingAssignmentsAndLocalNamesCache());
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#LEAVING} state so that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        currentAssignment = new HashSet<>();
+        transitionTo(MemberState.LEAVING);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldHeartbeatNow() {
+        MemberState state = state();
+        return state == MemberState.ACKNOWLEDGING || state == 
MemberState.LEAVING;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onHeartbeatRequestSent() {
+        MemberState state = state();
+        if (state == MemberState.ACKNOWLEDGING) {
+            if (allPendingAssignmentsReconciled()) {
+                transitionTo(MemberState.STABLE);
+            } else {
+                transitionTo(MemberState.RECONCILING);
+            }
+        } else if (state == MemberState.LEAVING) {
+            transitionTo(MemberState.UNSUBSCRIBED);
         }
-        return state.equals(MemberState.STABLE);
     }
 
     /**
-     * Take new target assignment received from the server and set it as 
targetAssignment to be
-     * processed. Following the consumer group protocol, the server won't send 
a new target
-     * member while a previous one hasn't been acknowledged by the member, so 
this will fail
-     * if a target assignment already exists.
+     * @return True if there are no assignments waiting to be resolved from 
metadata or reconciled.
+     */
+    private boolean allPendingAssignmentsReconciled() {
+        return assignmentUnresolved.isEmpty() && 
assignmentReadyToReconcile.isEmpty();
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        MemberState state = state();
+        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
+    }
+
+    /**
+     * Reconcile the assignment that has been received from the server and for 
which topic names
+     * are resolved, kept in the {@link #assignmentReadyToReconcile}. This 
will commit if needed,
+     * trigger the callbacks and update the subscription state. Note that only 
one reconciliation
+     * can be in progress at a time. If there is already another one in 
progress when this is
+     * triggered, it will be no-op, and the assignment will be reconciled on 
the next
+     * reconciliation loop.
+     */
+    boolean reconcile() {
+        // Make copy of the assignment to reconcile as it could change as new 
assignments or metadata updates are received
+        SortedSet<TopicPartition> assignedPartitions = new 
TreeSet<>(assignmentReadyToReconcile);
+        boolean sameAssignmentReceived = 
assignedPartitions.equals(subscriptions.assignedPartitions());
+
+        if (reconciliationInProgress || sameAssignmentReceived) {
+            String reason;
+            if (reconciliationInProgress) {
+                reason = "Another reconciliation is already in progress. 
Assignment " +
+                        assignmentReadyToReconcile + " will be handled in the 
next reconciliation loop.";
+            } else {
+                reason = "Target assignment ready to reconcile is equals to 
the member current assignment.";
+            }
+            log.debug("Ignoring reconciliation attempt." + reason);
+            return false;
+        }
+
+        markReconciliationInProgress();
+
+        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+
+        // Partitions to assign (not previously owned)
+        SortedSet<TopicPartition> addedPartitions = new TreeSet<>(COMPARATOR);
+        addedPartitions.addAll(assignedPartitions);
+        addedPartitions.removeAll(ownedPartitions);
+
+        // Partitions to revoke
+        SortedSet<TopicPartition> revokedPartitions = new 
TreeSet<>(COMPARATOR);
+        revokedPartitions.addAll(ownedPartitions);
+        revokedPartitions.removeAll(assignedPartitions);
+
+        log.info("Updating assignment with\n" +
+                        "\tAssigned partitions:                       {}\n" +
+                        "\tCurrent owned partitions:                  {}\n" +
+                        "\tAdded partitions (assigned - owned):       {}\n" +
+                        "\tRevoked partitions (owned - assigned):     {}\n",
+                assignedPartitions,
+                ownedPartitions,
+                addedPartitions,
+                revokedPartitions
+        );
+
+        CompletableFuture<Void> revocationResult;
+        if (!revokedPartitions.isEmpty()) {
+            revocationResult = revokePartitions(revokedPartitions);
+        } else {
+            revocationResult = CompletableFuture.completedFuture(null);
+            // Reschedule the auto commit starting from now (new assignment 
received without any
+            // revocation).
+            commitRequestManager.resetAutoCommitTimer();
+        }
+
+        // Future that will complete when the full reconciliation process 
completes (revocation
+        // and assignment, executed sequentially)
+        CompletableFuture<Void> reconciliationResult =
+                revocationResult.thenCompose(r -> {

Review Comment:
   ```suggestion
                   revocationResult.thenCompose(__ -> {
   ```
   
   Suggestion: use the double-underscore to denote to the reader that the 
variable is intended to remain unused.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -77,32 +138,110 @@ public class MembershipManagerImpl implements 
MembershipManager {
     /**
      * Assignment that the member received from the server and successfully 
processed.
      */
-    private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+    private Set<TopicPartition> currentAssignment;
 
     /**
-     * Assignment that the member received from the server but hasn't 
completely processed
-     * yet.
+     * Subscription state object holding the current assignment the member has 
for the topics it
+     * subscribed to.
      */
-    private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
targetAssignment;
+    private final SubscriptionState subscriptions;
+
+    /**
+     * Metadata that allows us to create the partitions needed for {@link 
ConsumerRebalanceListener}.
+     */
+    private final ConsumerMetadata metadata;
+
+    /**
+     * TopicPartition comparator based on topic name and partition id.
+     */
+    private final static TopicPartitionComparator COMPARATOR = new 
TopicPartitionComparator();
 
     /**
      * Logger.
      */
     private final Logger log;
 
-    public MembershipManagerImpl(String groupId, LogContext logContext) {
-        this(groupId, null, null, logContext);
+    /**
+     * Manager to perform commit requests needed before revoking partitions 
(if auto-commit is
+     * enabled)
+     */
+    private final CommitRequestManager commitRequestManager;
+
+    /**
+     * Local cache of assigned topic IDs and names. Topics are added here when 
received in a
+     * target assignment, as we discover their names in the Metadata cache, 
and removed when the
+     * topic is not in the subscription anymore. The purpose of this cache is 
to avoid metadata
+     * requests in cases where a currently assigned topic is in the target 
assignment (new
+     * partition assigned, or revoked), but it is not present the Metadata 
cache at that moment.
+     * The cache is cleared when the subscription changes ({@link 
#transitionToJoining()}, the
+     * member fails ({@link #transitionToFatal()} or leaves the group ({@link 
#leaveGroup()}).
+     */
+    private final Map<Uuid, String> assignedTopicNamesCache;
+
+    /**
+     * Topic IDs received in a target assignment for which we haven't found 
topic names yet.
+     * Items are added to this set every time a target assignment is received. 
Items are removed
+     * when metadata is found for the topic. This is where the member collects 
all assignments
+     * received from the broker, even though they may not be ready to 
reconcile due to missing
+     * metadata.
+     */
+    private final Map<Uuid, List<Integer>> assignmentUnresolved;
+
+    /**
+     * Assignment received for which topic names have been resolved, so it's 
ready to be
+     * reconciled. Items are added to this set when received in a target 
assignment (if metadata
+     * available), or when a metadata update is received. This is where the 
member keeps all the
+     * assignment ready to reconcile, even though the reconciliation might 
need to wait if there
+     * is already another on in process.
+     */
+    private final SortedSet<TopicPartition> assignmentReadyToReconcile;
+
+    /**
+     * Epoch that a member must include a heartbeat request to indicate that 
it want to join or
+     * re-join a group.
+     */
+    public static final int JOIN_GROUP_EPOCH = 0;
+
+    /**
+     * If there is a reconciliation running (triggering commit, callbacks) for 
the
+     * assignmentReadyToReconcile. This will be true if {@link #reconcile()} 
has been triggered
+     * after receiving a heartbeat response, or a metadata update.
+     */
+    private boolean reconciliationInProgress;
+
+    /**
+     * ID the member had when the reconciliation in progress started. This is 
used to identify if
+     * the member has rejoined while it was reconciling an assignment (in 
which case the result
+     * of the reconciliation is not applied.)
+     */
+    private String memberIdOnReconciliationStart;
+
+

Review Comment:
   ```suggestion
   
   ```
   
   Haha. I don't really care 😏



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -77,32 +138,110 @@ public class MembershipManagerImpl implements 
MembershipManager {
     /**
      * Assignment that the member received from the server and successfully 
processed.
      */
-    private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+    private Set<TopicPartition> currentAssignment;
 
     /**
-     * Assignment that the member received from the server but hasn't 
completely processed
-     * yet.
+     * Subscription state object holding the current assignment the member has 
for the topics it
+     * subscribed to.
      */
-    private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
targetAssignment;
+    private final SubscriptionState subscriptions;
+
+    /**
+     * Metadata that allows us to create the partitions needed for {@link 
ConsumerRebalanceListener}.
+     */
+    private final ConsumerMetadata metadata;
+
+    /**
+     * TopicPartition comparator based on topic name and partition id.
+     */
+    private final static TopicPartitionComparator COMPARATOR = new 
TopicPartitionComparator();
 
     /**
      * Logger.
      */
     private final Logger log;
 
-    public MembershipManagerImpl(String groupId, LogContext logContext) {
-        this(groupId, null, null, logContext);
+    /**
+     * Manager to perform commit requests needed before revoking partitions 
(if auto-commit is
+     * enabled)
+     */
+    private final CommitRequestManager commitRequestManager;
+
+    /**
+     * Local cache of assigned topic IDs and names. Topics are added here when 
received in a
+     * target assignment, as we discover their names in the Metadata cache, 
and removed when the
+     * topic is not in the subscription anymore. The purpose of this cache is 
to avoid metadata
+     * requests in cases where a currently assigned topic is in the target 
assignment (new
+     * partition assigned, or revoked), but it is not present the Metadata 
cache at that moment.
+     * The cache is cleared when the subscription changes ({@link 
#transitionToJoining()}, the
+     * member fails ({@link #transitionToFatal()} or leaves the group ({@link 
#leaveGroup()}).
+     */
+    private final Map<Uuid, String> assignedTopicNamesCache;
+
+    /**
+     * Topic IDs received in a target assignment for which we haven't found 
topic names yet.
+     * Items are added to this set every time a target assignment is received. 
Items are removed
+     * when metadata is found for the topic. This is where the member collects 
all assignments
+     * received from the broker, even though they may not be ready to 
reconcile due to missing
+     * metadata.
+     */
+    private final Map<Uuid, List<Integer>> assignmentUnresolved;
+
+    /**
+     * Assignment received for which topic names have been resolved, so it's 
ready to be
+     * reconciled. Items are added to this set when received in a target 
assignment (if metadata
+     * available), or when a metadata update is received. This is where the 
member keeps all the
+     * assignment ready to reconcile, even though the reconciliation might 
need to wait if there
+     * is already another on in process.
+     */
+    private final SortedSet<TopicPartition> assignmentReadyToReconcile;
+
+    /**
+     * Epoch that a member must include a heartbeat request to indicate that 
it want to join or
+     * re-join a group.
+     */
+    public static final int JOIN_GROUP_EPOCH = 0;
+
+    /**
+     * If there is a reconciliation running (triggering commit, callbacks) for 
the
+     * assignmentReadyToReconcile. This will be true if {@link #reconcile()} 
has been triggered
+     * after receiving a heartbeat response, or a metadata update.
+     */
+    private boolean reconciliationInProgress;
+
+    /**
+     * ID the member had when the reconciliation in progress started. This is 
used to identify if
+     * the member has rejoined while it was reconciling an assignment (in 
which case the result
+     * of the reconciliation is not applied.)
+     */
+    private String memberIdOnReconciliationStart;
+
+
+    public MembershipManagerImpl(String groupId,
+                                 SubscriptionState subscriptions,
+                                 CommitRequestManager commitRequestManager,
+                                 ConsumerMetadata metadata,
+                                 LogContext logContext) {
+        this(groupId, null, null, subscriptions, commitRequestManager, 
metadata, logContext);
     }
 
     public MembershipManagerImpl(String groupId,
                                  String groupInstanceId,
                                  String serverAssignor,

Review Comment:
   Suggestion: make `groupInstanceId` and `serverAssignor` `Optional` as 
constructor parameters to convey to the callers that they are, indeed, 
_optional_.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +347,537 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+        clearPendingAssignmentsAndLocalNamesCache();
+        transitionTo(MemberState.FATAL);
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public void transitionToJoining() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+        clearPendingAssignmentsAndLocalNamesCache();
+        // Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+        // reconciliation completes while the member is rejoining but hasn't 
received the new
+        // member ID yet, the reconciliation result is discarded.
+        memberIdOnReconciliationStart = null;
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * {@inheritDoc}
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
-            transitionTo(MemberState.STABLE);
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.PREPARE_LEAVING);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
+    }
+
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
         } else {
-            transitionTo(MemberState.RECONCILING);
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+            // Remove all topic IDs and names from local cache
+            callbackResult.whenComplete((result, error) -> 
clearPendingAssignmentsAndLocalNamesCache());
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#LEAVING} state so that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        currentAssignment = new HashSet<>();
+        transitionTo(MemberState.LEAVING);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldHeartbeatNow() {
+        MemberState state = state();
+        return state == MemberState.ACKNOWLEDGING || state == 
MemberState.LEAVING;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onHeartbeatRequestSent() {
+        MemberState state = state();
+        if (state == MemberState.ACKNOWLEDGING) {
+            if (allPendingAssignmentsReconciled()) {
+                transitionTo(MemberState.STABLE);
+            } else {
+                transitionTo(MemberState.RECONCILING);
+            }
+        } else if (state == MemberState.LEAVING) {
+            transitionTo(MemberState.UNSUBSCRIBED);
         }
-        return state.equals(MemberState.STABLE);
     }
 
     /**
-     * Take new target assignment received from the server and set it as 
targetAssignment to be
-     * processed. Following the consumer group protocol, the server won't send 
a new target
-     * member while a previous one hasn't been acknowledged by the member, so 
this will fail
-     * if a target assignment already exists.
+     * @return True if there are no assignments waiting to be resolved from 
metadata or reconciled.
+     */
+    private boolean allPendingAssignmentsReconciled() {
+        return assignmentUnresolved.isEmpty() && 
assignmentReadyToReconcile.isEmpty();
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        MemberState state = state();
+        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
+    }
+
+    /**
+     * Reconcile the assignment that has been received from the server and for 
which topic names
+     * are resolved, kept in the {@link #assignmentReadyToReconcile}. This 
will commit if needed,
+     * trigger the callbacks and update the subscription state. Note that only 
one reconciliation
+     * can be in progress at a time. If there is already another one in 
progress when this is
+     * triggered, it will be no-op, and the assignment will be reconciled on 
the next
+     * reconciliation loop.
+     */
+    boolean reconcile() {
+        // Make copy of the assignment to reconcile as it could change as new 
assignments or metadata updates are received
+        SortedSet<TopicPartition> assignedPartitions = new 
TreeSet<>(assignmentReadyToReconcile);
+        boolean sameAssignmentReceived = 
assignedPartitions.equals(subscriptions.assignedPartitions());

Review Comment:
   I think this `equals()` call is OK. From looking at `AbstractSet`, it 
appears that `SortedSet.equals()` is OK to accept any ol' `Set` implementation.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +347,537 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+        clearPendingAssignmentsAndLocalNamesCache();
+        transitionTo(MemberState.FATAL);
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public void transitionToJoining() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+        clearPendingAssignmentsAndLocalNamesCache();
+        // Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+        // reconciliation completes while the member is rejoining but hasn't 
received the new
+        // member ID yet, the reconciliation result is discarded.
+        memberIdOnReconciliationStart = null;
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * {@inheritDoc}
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
-            transitionTo(MemberState.STABLE);
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.PREPARE_LEAVING);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
+    }
+
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
         } else {
-            transitionTo(MemberState.RECONCILING);
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+            // Remove all topic IDs and names from local cache
+            callbackResult.whenComplete((result, error) -> 
clearPendingAssignmentsAndLocalNamesCache());
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#LEAVING} state so that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        currentAssignment = new HashSet<>();
+        transitionTo(MemberState.LEAVING);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldHeartbeatNow() {
+        MemberState state = state();
+        return state == MemberState.ACKNOWLEDGING || state == 
MemberState.LEAVING;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onHeartbeatRequestSent() {
+        MemberState state = state();
+        if (state == MemberState.ACKNOWLEDGING) {
+            if (allPendingAssignmentsReconciled()) {
+                transitionTo(MemberState.STABLE);
+            } else {
+                transitionTo(MemberState.RECONCILING);
+            }
+        } else if (state == MemberState.LEAVING) {
+            transitionTo(MemberState.UNSUBSCRIBED);
         }
-        return state.equals(MemberState.STABLE);
     }
 
     /**
-     * Take new target assignment received from the server and set it as 
targetAssignment to be
-     * processed. Following the consumer group protocol, the server won't send 
a new target
-     * member while a previous one hasn't been acknowledged by the member, so 
this will fail
-     * if a target assignment already exists.
+     * @return True if there are no assignments waiting to be resolved from 
metadata or reconciled.
+     */
+    private boolean allPendingAssignmentsReconciled() {
+        return assignmentUnresolved.isEmpty() && 
assignmentReadyToReconcile.isEmpty();
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        MemberState state = state();
+        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
+    }
+
+    /**
+     * Reconcile the assignment that has been received from the server and for 
which topic names
+     * are resolved, kept in the {@link #assignmentReadyToReconcile}. This 
will commit if needed,
+     * trigger the callbacks and update the subscription state. Note that only 
one reconciliation
+     * can be in progress at a time. If there is already another one in 
progress when this is
+     * triggered, it will be no-op, and the assignment will be reconciled on 
the next
+     * reconciliation loop.
+     */
+    boolean reconcile() {
+        // Make copy of the assignment to reconcile as it could change as new 
assignments or metadata updates are received
+        SortedSet<TopicPartition> assignedPartitions = new 
TreeSet<>(assignmentReadyToReconcile);
+        boolean sameAssignmentReceived = 
assignedPartitions.equals(subscriptions.assignedPartitions());
+
+        if (reconciliationInProgress || sameAssignmentReceived) {
+            String reason;
+            if (reconciliationInProgress) {
+                reason = "Another reconciliation is already in progress. 
Assignment " +
+                        assignmentReadyToReconcile + " will be handled in the 
next reconciliation loop.";
+            } else {
+                reason = "Target assignment ready to reconcile is equals to 
the member current assignment.";
+            }
+            log.debug("Ignoring reconciliation attempt." + reason);

Review Comment:
   ```suggestion
               log.debug("Ignoring reconciliation attempt. " + reason);
   ```
   
   Nit: it'll be visually easier to parse with the space before the next 
sentence.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +347,537 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+        clearPendingAssignmentsAndLocalNamesCache();
+        transitionTo(MemberState.FATAL);
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public void transitionToJoining() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+        clearPendingAssignmentsAndLocalNamesCache();
+        // Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+        // reconciliation completes while the member is rejoining but hasn't 
received the new
+        // member ID yet, the reconciliation result is discarded.
+        memberIdOnReconciliationStart = null;
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * {@inheritDoc}
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
-            transitionTo(MemberState.STABLE);
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.PREPARE_LEAVING);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
+    }
+
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
         } else {
-            transitionTo(MemberState.RECONCILING);
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+            // Remove all topic IDs and names from local cache
+            callbackResult.whenComplete((result, error) -> 
clearPendingAssignmentsAndLocalNamesCache());
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#LEAVING} state so that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        currentAssignment = new HashSet<>();
+        transitionTo(MemberState.LEAVING);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldHeartbeatNow() {
+        MemberState state = state();
+        return state == MemberState.ACKNOWLEDGING || state == 
MemberState.LEAVING;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onHeartbeatRequestSent() {
+        MemberState state = state();
+        if (state == MemberState.ACKNOWLEDGING) {
+            if (allPendingAssignmentsReconciled()) {
+                transitionTo(MemberState.STABLE);
+            } else {
+                transitionTo(MemberState.RECONCILING);
+            }
+        } else if (state == MemberState.LEAVING) {
+            transitionTo(MemberState.UNSUBSCRIBED);
         }
-        return state.equals(MemberState.STABLE);
     }
 
     /**
-     * Take new target assignment received from the server and set it as 
targetAssignment to be
-     * processed. Following the consumer group protocol, the server won't send 
a new target
-     * member while a previous one hasn't been acknowledged by the member, so 
this will fail
-     * if a target assignment already exists.
+     * @return True if there are no assignments waiting to be resolved from 
metadata or reconciled.
+     */
+    private boolean allPendingAssignmentsReconciled() {
+        return assignmentUnresolved.isEmpty() && 
assignmentReadyToReconcile.isEmpty();
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        MemberState state = state();
+        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
+    }
+
+    /**
+     * Reconcile the assignment that has been received from the server and for 
which topic names
+     * are resolved, kept in the {@link #assignmentReadyToReconcile}. This 
will commit if needed,
+     * trigger the callbacks and update the subscription state. Note that only 
one reconciliation
+     * can be in progress at a time. If there is already another one in 
progress when this is
+     * triggered, it will be no-op, and the assignment will be reconciled on 
the next
+     * reconciliation loop.
+     */
+    boolean reconcile() {
+        // Make copy of the assignment to reconcile as it could change as new 
assignments or metadata updates are received
+        SortedSet<TopicPartition> assignedPartitions = new 
TreeSet<>(assignmentReadyToReconcile);
+        boolean sameAssignmentReceived = 
assignedPartitions.equals(subscriptions.assignedPartitions());
+
+        if (reconciliationInProgress || sameAssignmentReceived) {
+            String reason;
+            if (reconciliationInProgress) {
+                reason = "Another reconciliation is already in progress. 
Assignment " +
+                        assignmentReadyToReconcile + " will be handled in the 
next reconciliation loop.";
+            } else {
+                reason = "Target assignment ready to reconcile is equals to 
the member current assignment.";
+            }
+            log.debug("Ignoring reconciliation attempt." + reason);
+            return false;
+        }
+
+        markReconciliationInProgress();
+
+        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+
+        // Partitions to assign (not previously owned)
+        SortedSet<TopicPartition> addedPartitions = new TreeSet<>(COMPARATOR);
+        addedPartitions.addAll(assignedPartitions);
+        addedPartitions.removeAll(ownedPartitions);
+
+        // Partitions to revoke
+        SortedSet<TopicPartition> revokedPartitions = new 
TreeSet<>(COMPARATOR);
+        revokedPartitions.addAll(ownedPartitions);
+        revokedPartitions.removeAll(assignedPartitions);
+
+        log.info("Updating assignment with\n" +
+                        "\tAssigned partitions:                       {}\n" +
+                        "\tCurrent owned partitions:                  {}\n" +
+                        "\tAdded partitions (assigned - owned):       {}\n" +
+                        "\tRevoked partitions (owned - assigned):     {}\n",
+                assignedPartitions,
+                ownedPartitions,
+                addedPartitions,
+                revokedPartitions
+        );
+
+        CompletableFuture<Void> revocationResult;
+        if (!revokedPartitions.isEmpty()) {
+            revocationResult = revokePartitions(revokedPartitions);
+        } else {
+            revocationResult = CompletableFuture.completedFuture(null);
+            // Reschedule the auto commit starting from now (new assignment 
received without any
+            // revocation).
+            commitRequestManager.resetAutoCommitTimer();
+        }
+
+        // Future that will complete when the full reconciliation process 
completes (revocation
+        // and assignment, executed sequentially)
+        CompletableFuture<Void> reconciliationResult =
+                revocationResult.thenCompose(r -> {
+                    boolean memberHasRejoined = 
!Objects.equals(memberIdOnReconciliationStart,
+                            memberId);
+                    if (state == MemberState.RECONCILING && 
!memberHasRejoined) {
+
+                        // Make assignment effective on the client by updating 
the subscription state.
+                        subscriptions.assignFromSubscribed(assignedPartitions);
+
+                        // Clear topic names cache only for topics that are 
not in the subscription anymore
+                        for (TopicPartition tp : revokedPartitions) {
+                            if 
(!subscriptions.subscription().contains(tp.topic())) {
+                                
assignedTopicNamesCache.values().remove(tp.topic());
+                            }
+                        }
+
+                        // Invoke user call back
+                        return 
invokeOnPartitionsAssignedCallback(addedPartitions);
+
+                    } else {
+                        String reason;
+                        if (state != MemberState.RECONCILING) {
+                            reason = "The member already transitioned out of 
the reconciling " +
+                                    "state into " + state;
+                        } else {
+                            reason = "The member has re-joined the group.";
+                        }
+                        // Revocation callback completed but the reconciled 
assignment should not
+                        // be applied (not relevant anymore). This could be 
because the member
+                        // is not in the RECONCILING state anymore (fenced, 
failed, unsubscribed),
+                        // or because it has already re-joined the group.
+                        CompletableFuture<Void> res = new 
CompletableFuture<>();
+                        res.completeExceptionally(new 
KafkaException("Interrupting reconciliation" +
+                                " after revocation. " + reason));
+                        return res;
+                    }

Review Comment:
   Suggestion: consider moving this to an `assignPartitions()` method, similar 
to the `revokePartitions` method, for consistency and readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to