lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395862422


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -17,253 +17,1059 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
-import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class MembershipManagerImplTest {
 
     private static final String GROUP_ID = "test-group";
     private static final String MEMBER_ID = "test-member-1";
     private static final int MEMBER_EPOCH = 1;
-    private final LogContext logContext = new LogContext();
+
+    private SubscriptionState subscriptionState;
+    private ConsumerMetadata metadata;
+
+    private CommitRequestManager commitRequestManager;
+
+    private ConsumerTestBuilder testBuilder;
+
+    @BeforeEach
+    public void setup() {
+        testBuilder = new 
ConsumerTestBuilder(ConsumerTestBuilder.createDefaultGroupInformation());
+        metadata = testBuilder.metadata;
+        subscriptionState = testBuilder.subscriptions;
+        commitRequestManager = testBuilder.commitRequestManager.get();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (testBuilder != null) {
+            testBuilder.close();
+        }
+    }
+
+    private MembershipManagerImpl createMembershipManagerJoiningGroup() {
+        MembershipManagerImpl manager = spy(new MembershipManagerImpl(
+                GROUP_ID, subscriptionState, commitRequestManager,
+                metadata, testBuilder.logContext));
+        manager.transitionToJoining();
+        return manager;
+    }
+
+    private MembershipManagerImpl createMembershipManagerJoiningGroup(String 
groupInstanceId,
+                                                                      String 
serverAssignor) {
+        MembershipManagerImpl manager = new MembershipManagerImpl(
+                GROUP_ID, Optional.ofNullable(groupInstanceId), 
Optional.ofNullable(serverAssignor),
+                subscriptionState, commitRequestManager, metadata, 
testBuilder.logContext);
+        manager.transitionToJoining();
+        return manager;
+    }
 
     @Test
     public void testMembershipManagerServerAssignor() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         assertEquals(Optional.empty(), membershipManager.serverAssignor());
 
-        membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", 
"Uniform", logContext);
+        membershipManager = createMembershipManagerJoiningGroup("instance1", 
"Uniform");
         assertEquals(Optional.of("Uniform"), 
membershipManager.serverAssignor());
     }
 
     @Test
     public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
-        new MembershipManagerImpl(GROUP_ID, logContext);
-        new MembershipManagerImpl(GROUP_ID, null, null, logContext);
+        createMembershipManagerJoiningGroup();
+        createMembershipManagerJoiningGroup(null, null);
+    }
+
+    @Test
+    public void 
testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin() {
+        // First join should register to get metadata updates
+        MembershipManagerImpl manager = new MembershipManagerImpl(
+                GROUP_ID, subscriptionState, commitRequestManager,
+                metadata, testBuilder.logContext);
+        manager.transitionToJoining();
+        verify(metadata).addClusterUpdateListener(manager);
+        clearInvocations(metadata);
+
+        // Following joins should not register again.
+        receiveEmptyAssignment(manager);
+        mockLeaveGroup();
+        manager.leaveGroup();
+        assertEquals(MemberState.LEAVING, manager.state());
+        manager.onHeartbeatRequestSent();
+        assertEquals(MemberState.UNSUBSCRIBED, manager.state());
+        manager.transitionToJoining();
+        verify(metadata, never()).addClusterUpdateListener(manager);
+    }
+
+    @Test
+    public void testReconcilingWhenReceivingAssignmentFoundInMetadata() {
+        MembershipManager membershipManager = 
mockJoinAndReceiveAssignment(true);
+        assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+
+        // When the ack is sent the member should go back to STABLE
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.STABLE, membershipManager.state());
     }
 
     @Test
     public void testTransitionToReconcilingOnlyIfAssignmentReceived() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        assertEquals(MemberState.UNJOINED, membershipManager.state());
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
+        assertEquals(MemberState.JOINING, membershipManager.state());
 
         ConsumerGroupHeartbeatResponse responseWithoutAssignment =
                 createConsumerGroupHeartbeatResponse(null);
-        membershipManager.updateState(responseWithoutAssignment.data());
+        
membershipManager.onHeartbeatResponseReceived(responseWithoutAssignment.data());
         assertNotEquals(MemberState.RECONCILING, membershipManager.state());
 
         ConsumerGroupHeartbeatResponse responseWithAssignment =
-                createConsumerGroupHeartbeatResponse(createAssignment());
-        membershipManager.updateState(responseWithAssignment.data());
+                createConsumerGroupHeartbeatResponse(createAssignment(true));
+        
membershipManager.onHeartbeatResponseReceived(responseWithAssignment.data());
         assertEquals(MemberState.RECONCILING, membershipManager.state());
     }
 
     @Test
     public void testMemberIdAndEpochResetOnFencedMembers() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        ConsumerGroupHeartbeatResponse heartbeatResponse =
-                createConsumerGroupHeartbeatResponse(null);
-        membershipManager.updateState(heartbeatResponse.data());
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
+        ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
+        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
         assertEquals(MemberState.STABLE, membershipManager.state());
         assertEquals(MEMBER_ID, membershipManager.memberId());
         assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
 
+        mockMemberHasAutoAssignedPartition();
+
         membershipManager.transitionToFenced();
-        assertFalse(membershipManager.memberId().isEmpty());
+        assertEquals(MEMBER_ID, membershipManager.memberId());
         assertEquals(0, membershipManager.memberEpoch());
     }
 
     @Test
-    public void testTransitionToFailure() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+    public void testTransitionToFatal() {
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         ConsumerGroupHeartbeatResponse heartbeatResponse =
                 createConsumerGroupHeartbeatResponse(null);
-        membershipManager.updateState(heartbeatResponse.data());
+        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
         assertEquals(MemberState.STABLE, membershipManager.state());
         assertEquals(MEMBER_ID, membershipManager.memberId());
         assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
 
-        membershipManager.transitionToFailed();
-        assertEquals(MemberState.FAILED, membershipManager.state());
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        membershipManager.transitionToFatal();
+        assertEquals(MemberState.FATAL, membershipManager.state());
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+    }
+
+    @Test
+    public void testTransitionToFailedWhenTryingToJoin() {
+        MembershipManagerImpl membershipManager = new MembershipManagerImpl(
+                GROUP_ID, subscriptionState, commitRequestManager, metadata,
+                testBuilder.logContext);
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+        membershipManager.transitionToJoining();
+
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        membershipManager.transitionToFatal();
+        assertEquals(MemberState.FATAL, membershipManager.state());
     }
 
     @Test
     public void testFencingWhenStateIsStable() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
-        membershipManager.updateState(heartbeatResponse.data());
+        MembershipManager membershipManager = createMemberInStableState();
+        
testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager);
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+    }
+
+    @Test
+    public void testFencingWhenStateIsReconciling() {
+        MembershipManager membershipManager = 
mockJoinAndReceiveAssignment(false);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+        
testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager);
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+    }
+
+    /**
+     * This is the case where a member is stuck reconciling and transition out 
of the RECONCILING
+     * state (due to failure). When the reconciliation completes it should not 
be applied because
+     * it is not relevant anymore (it should not update the assignment on the 
member or send ack).
+     */
+    @Test
+    public void 
testDelayedReconciliationResultDiscardedIfMemberNotInReconcilingStateAnymore() {
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        Uuid topicId1 = Uuid.randomUuid();
+        String topic1 = "topic1";
+        Set<TopicPartition> owned = Collections.singleton(new 
TopicPartition(topic1, 0));
+        mockOwnedPartitionAndAssignmentReceived(topicId1, topic1, owned, true);
+
+        // Reconciliation that does not complete stuck on revocation commit.
+        CompletableFuture<Void> commitResult = 
mockEmptyAssignmentAndRevocationStuckOnCommit(membershipManager);
+
+        // Member received fatal error while reconciling
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        membershipManager.transitionToFatal();
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        clearInvocations(subscriptionState);
+
+        // Complete commit request
+        commitResult.complete(null);
+
+        // Member should not update the subscription or send ack when the 
delayed reconciliation
+        // completed.
+        verify(subscriptionState, never()).assignFromSubscribed(anySet());
+        assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+    }
+
+    /**
+     * This is the case where a member is stuck reconciling an assignment A 
(waiting on
+     * metadata, commit or callbacks), and it rejoins (due to fence or 
unsubscribe/subscribe). If
+     * the reconciliation of A completes it should not be applied (it should 
not update the
+     * assignment on the member or send ack).
+     */
+    @Test
+    public void testDelayedReconciliationResultDiscardedIfMemberRejoins() {
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        Uuid topicId1 = Uuid.randomUuid();
+        String topic1 = "topic1";
+        Set<TopicPartition> owned = Collections.singleton(new 
TopicPartition(topic1, 0));
+        mockOwnedPartitionAndAssignmentReceived(topicId1, topic1, owned, true);
+
+        // Reconciliation that does not complete stuck on revocation commit.
+        CompletableFuture<Void> commitResult =
+                mockNewAssignmentAndRevocationStuckOnCommit(membershipManager, 
topicId1, topic1,
+                        Arrays.asList(1, 2), true);
+        Set<TopicPartition> assignment1 = new HashSet<>();
+        assignment1.add(new TopicPartition(topic1, 1));
+        assignment1.add(new TopicPartition(topic1, 2));
+        assertEquals(assignment1, 
membershipManager.assignmentReadyToReconcile());
+        int currentEpoch = membershipManager.memberEpoch();
+
+        // Get fenced and rejoin while still reconciling. Get new assignment 
to reconcile after
+        // rejoining.
+        
testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager);
+        clearInvocations(subscriptionState);
+
+        // Get new assignment A2 after rejoining. This should not trigger a 
reconciliation just
+        // yet because there is another on in progress, but should keep the 
new assignment ready
+        // to be reconciled next.
+        Uuid topicId3 = Uuid.randomUuid();
+        mockOwnedPartitionAndAssignmentReceived(topicId3, "topic3", owned, 
true);
+        receiveAssignmentAfterRejoin(topicId3, Collections.singletonList(5), 
membershipManager);
+        verifyReconciliationNotTriggered(membershipManager);
+        Set<TopicPartition> assignmentAfterRejoin = Collections.singleton(new 
TopicPartition("topic3", 5));
+        assertEquals(assignmentAfterRejoin, 
membershipManager.assignmentReadyToReconcile());
+
+        // Reconciliation completes when the member has already re-joined the 
group. Should not
+        // update the subscription state or send ack.
+        commitResult.complete(null);
+        verify(subscriptionState, 
never()).assignFromSubscribed(anyCollection());
+        assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+
+        // Assignment received after rejoining should be ready to reconcile on 
the next
+        // reconciliation loop.
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertEquals(assignmentAfterRejoin, 
membershipManager.assignmentReadyToReconcile());
+    }
+
+    /**
+     * This is the case where a member is stuck reconciling an assignment A 
(waiting on
+     * metadata, commit or callbacks), and the target assignment changes (due 
to new topics added
+     * to metadata, or new assignment received from broker). If the 
reconciliation of A completes
+     * t should be applied (should update the assignment on the member and 
send ack), and then
+     * the reconciliation of assignment B will be processed and applied in the 
next
+     * reconciliation loop.
+     */
+    @Test
+    public void 
testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataUpdate() {
+        // Member receives and reconciles topic1-partition0
+        Uuid topicId1 = Uuid.randomUuid();
+        String topic1 = "topic1";
+        MembershipManagerImpl membershipManager =
+                mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1, 
topic1, Arrays.asList(0));
+        membershipManager.onHeartbeatRequestSent();
         assertEquals(MemberState.STABLE, membershipManager.state());
+        clearInvocations(membershipManager, subscriptionState);
+        
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(new
 TopicPartition(topic1, 0)));
+
+        // New assignment revoking the partitions owned and adding a new one 
(not in metadata).
+        // Reconciliation triggered for topic 1 (stuck on revocation commit) 
and topic2 waiting
+        // for metadata.
+        Uuid topicId2 = Uuid.randomUuid();
+        String topic2 = "topic2";
+        CompletableFuture<Void> commitResult =
+                mockNewAssignmentAndRevocationStuckOnCommit(membershipManager, 
topicId2, topic2,
+                        Arrays.asList(1, 2), false);
+        verify(metadata).requestUpdate(anyBoolean());
+        assertEquals(Collections.singleton(topicId2), 
membershipManager.topicsWaitingForMetadata());
 
-        testStateUpdateOnFenceError(membershipManager);
+        // Metadata discovered for topic2 while reconciliation in progress to 
revoke topic1.
+        // Should not trigger a new reconciliation because there is one 
already in progress.
+        mockTopicNameInMetadataCache(Collections.singletonMap(topicId2, 
topic2), true);
+        membershipManager.onUpdate(null);
+        assertEquals(Collections.emptySet(), 
membershipManager.topicsWaitingForMetadata());
+        verifyReconciliationNotTriggered(membershipManager);
+
+        // Reconciliation in progress completes. Should be applied revoking 
topic 1 only. Newly
+        // discovered topic2 will be reconciled in the next reconciliation 
loop.
+        commitResult.complete(null);
+
+        // Member should update the subscription and send ack when the delayed 
reconciliation
+        // completes.
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+
+        // Pending assignment that was discovered in metadata should be ready 
to reconcile in the
+        // next reconciliation loop.
+        Set<TopicPartition> topic2Assignment = new HashSet<>(Arrays.asList(
+                new TopicPartition(topic2, 1),
+                new TopicPartition(topic2, 2)));
+        assertEquals(topic2Assignment, 
membershipManager.assignmentReadyToReconcile());
+        assertEquals(topic2Assignment, 
membershipManager.assignmentReadyToReconcile());
     }
 
     @Test
-    public void testFencingWhenStateIsReconciling() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(createAssignment());
-        membershipManager.updateState(heartbeatResponse.data());
+    public void testLeaveGroupWhenStateIsStable() {
+        MembershipManager membershipManager = createMemberInStableState();
+        
testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager);
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+    }
+
+    @Test
+    public void testLeaveGroupWhenStateIsReconciling() {
+        MembershipManager membershipManager = 
mockJoinAndReceiveAssignment(false);
         assertEquals(MemberState.RECONCILING, membershipManager.state());
 
-        testStateUpdateOnFenceError(membershipManager);
+        
testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager);
+    }
+
+    @Test
+    public void testLeaveGroupWhenMemberAlreadyLeaving() {
+        MembershipManager membershipManager = createMemberInStableState();
+
+        // First leave attempt. Should trigger the callbacks and stay LEAVING 
until
+        // callbacks complete and the heartbeat is sent out.
+        mockLeaveGroup();
+        CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();
+        assertFalse(leaveResult1.isDone());
+        assertEquals(MemberState.LEAVING, membershipManager.state());
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        clearInvocations(subscriptionState);
+
+        // Second leave attempt while the first one has not completed yet. 
Should not
+        // trigger any callbacks, and return a future that will complete when 
the ongoing first
+        // leave operation completes.
+        mockLeaveGroup();
+        CompletableFuture<Void> leaveResult2 = membershipManager.leaveGroup();
+        verify(subscriptionState, never()).rebalanceListener();
+        assertFalse(leaveResult2.isDone());
+
+        // Complete first leave group operation. Should also complete the 
second leave group.
+        membershipManager.onHeartbeatRequestSent();
+        assertTrue(leaveResult1.isDone());
+        assertFalse(leaveResult1.isCompletedExceptionally());
+        assertTrue(leaveResult2.isDone());
+        assertFalse(leaveResult2.isCompletedExceptionally());
+
+        // Subscription should have been updated only once with the first 
leave group.
+        verify(subscriptionState, 
never()).assignFromSubscribed(Collections.emptySet());
+    }
+
+    @Test
+    public void testLeaveGroupWhenMemberAlreadyLeft() {
+        MembershipManager membershipManager = createMemberInStableState();
+
+        // Leave group triggered and completed
+        mockLeaveGroup();
+        CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();
+        assertEquals(MemberState.LEAVING, membershipManager.state());
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+        assertTrue(leaveResult1.isDone());
+        assertFalse(leaveResult1.isCompletedExceptionally());
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        clearInvocations(subscriptionState);
+
+        // Call to leave group again, when member already left. Should be 
no-op (no callbacks,
+        // no assignment updated)
+        mockLeaveGroup();
+        CompletableFuture<Void> leaveResult2 = membershipManager.leaveGroup();
+        assertTrue(leaveResult2.isDone());
+        assertFalse(leaveResult2.isCompletedExceptionally());
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+        verify(subscriptionState, never()).rebalanceListener();
+        verify(subscriptionState, 
never()).assignFromSubscribed(Collections.emptySet());
     }
 
     @Test
     public void testFatalFailureWhenStateIsUnjoined() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        assertEquals(MemberState.UNJOINED, membershipManager.state());
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
+        assertEquals(MemberState.JOINING, membershipManager.state());
 
         testStateUpdateOnFatalFailure(membershipManager);
     }
 
     @Test
     public void testFatalFailureWhenStateIsStable() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
-        membershipManager.updateState(heartbeatResponse.data());
+        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
         assertEquals(MemberState.STABLE, membershipManager.state());
 
         testStateUpdateOnFatalFailure(membershipManager);
     }
 
-    @Test
-    public void testFencingShouldNotHappenWhenStateIsUnjoined() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        assertEquals(MemberState.UNJOINED, membershipManager.state());
-
-        // Getting fenced when the member is not part of the group is not 
expected and should
-        // fail with invalid transition.
-        assertThrows(IllegalStateException.class, 
membershipManager::transitionToFenced);
-    }
-
     @Test
     public void testUpdateStateFailsOnResponsesWithErrors() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         // Updating state with a heartbeat response containing errors cannot 
be performed and
         // should fail.
         ConsumerGroupHeartbeatResponse unknownMemberResponse =
                 
createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID);
         assertThrows(IllegalArgumentException.class,
-                () -> 
membershipManager.updateState(unknownMemberResponse.data()));
+                () -> 
membershipManager.onHeartbeatResponseReceived(unknownMemberResponse.data()));
     }
 
+    /**
+     * This test should be the case when an assignment is sent to the member, 
and it cannot find
+     * it in metadata (permanently, ex. topic deleted). The member will keep 
the assignment as
+     * waiting for metadata, but the following assignment received from the 
broker will not
+     * contain the deleted topic. The member will discard the assignment that 
was pending and
+     * proceed with the reconciliation of the new assignment.
+     */
     @Test
-    public void testAssignmentUpdatedAsReceivedAndProcessed() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        ConsumerGroupHeartbeatResponseData.Assignment newAssignment = 
createAssignment();
-        ConsumerGroupHeartbeatResponse heartbeatResponse =
-                createConsumerGroupHeartbeatResponse(newAssignment);
-        membershipManager.updateState(heartbeatResponse.data());
+    public void testNewAssignmentReplacesPreviousOneWaitingOnMetadata() {
+        MembershipManagerImpl membershipManager = 
mockJoinAndReceiveAssignment(false);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+        // When the ack is sent the member should go back to RECONCILING
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertTrue(membershipManager.topicsWaitingForMetadata().size() > 0);
+
+        // New target assignment received while there is another one waiting 
to be resolved
+        // and reconciled. This assignment does not include the previous one 
that is waiting
+        // for metadata, so the member will discard the topics that were 
waiting for metadata, and
+        // reconcile the new assignment.
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "topic1";
+        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
+        receiveAssignment(topicId, Collections.singletonList(0), 
membershipManager);
+        Set<TopicPartition> expectedAssignment = Collections.singleton(new 
TopicPartition(topicName, 0));
+        assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+        verify(subscriptionState).assignFromSubscribed(expectedAssignment);
+
+        // When ack for the reconciled assignment is sent, member should go 
back to STABLE
+        // because the first assignment that was not resolved should have been 
discarded
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.STABLE, membershipManager.state());
+        assertTrue(membershipManager.topicsWaitingForMetadata().isEmpty());
+    }
+
+    /**
+     * This test should be the case when an assignment is sent to the member, 
and it cannot find
+     * it in metadata (temporarily). The broker will continue to send the 
assignment to the
+     * member. The member should keep it was waiting for metadata and continue 
to request updates.

Review Comment:
   Agree, rephrased it. This test is just for the case where the broker does 
keep sending (and the test above for when it does not)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to