lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395862422
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ########## @@ -17,253 +17,1059 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; -import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class MembershipManagerImplTest { private static final String GROUP_ID = "test-group"; private static final String MEMBER_ID = "test-member-1"; private static final int MEMBER_EPOCH = 1; - private final LogContext logContext = new LogContext(); + + private SubscriptionState subscriptionState; + private ConsumerMetadata metadata; + + private CommitRequestManager commitRequestManager; + + private ConsumerTestBuilder testBuilder; + + @BeforeEach + public void setup() { + testBuilder = new ConsumerTestBuilder(ConsumerTestBuilder.createDefaultGroupInformation()); + metadata = testBuilder.metadata; + subscriptionState = testBuilder.subscriptions; + commitRequestManager = testBuilder.commitRequestManager.get(); + } + + @AfterEach + public void tearDown() { + if (testBuilder != null) { + testBuilder.close(); + } + } + + private MembershipManagerImpl createMembershipManagerJoiningGroup() { + MembershipManagerImpl manager = spy(new MembershipManagerImpl( + GROUP_ID, subscriptionState, commitRequestManager, + metadata, testBuilder.logContext)); + manager.transitionToJoining(); + return manager; + } + + private MembershipManagerImpl createMembershipManagerJoiningGroup(String groupInstanceId, + String serverAssignor) { + MembershipManagerImpl manager = new MembershipManagerImpl( + GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.ofNullable(serverAssignor), + subscriptionState, commitRequestManager, metadata, testBuilder.logContext); + manager.transitionToJoining(); + return manager; + } @Test public void testMembershipManagerServerAssignor() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); assertEquals(Optional.empty(), membershipManager.serverAssignor()); - membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", "Uniform", logContext); + membershipManager = createMembershipManagerJoiningGroup("instance1", "Uniform"); assertEquals(Optional.of("Uniform"), membershipManager.serverAssignor()); } @Test public void testMembershipManagerInitSupportsEmptyGroupInstanceId() { - new MembershipManagerImpl(GROUP_ID, logContext); - new MembershipManagerImpl(GROUP_ID, null, null, logContext); + createMembershipManagerJoiningGroup(); + createMembershipManagerJoiningGroup(null, null); + } + + @Test + public void testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin() { + // First join should register to get metadata updates + MembershipManagerImpl manager = new MembershipManagerImpl( + GROUP_ID, subscriptionState, commitRequestManager, + metadata, testBuilder.logContext); + manager.transitionToJoining(); + verify(metadata).addClusterUpdateListener(manager); + clearInvocations(metadata); + + // Following joins should not register again. + receiveEmptyAssignment(manager); + mockLeaveGroup(); + manager.leaveGroup(); + assertEquals(MemberState.LEAVING, manager.state()); + manager.onHeartbeatRequestSent(); + assertEquals(MemberState.UNSUBSCRIBED, manager.state()); + manager.transitionToJoining(); + verify(metadata, never()).addClusterUpdateListener(manager); + } + + @Test + public void testReconcilingWhenReceivingAssignmentFoundInMetadata() { + MembershipManager membershipManager = mockJoinAndReceiveAssignment(true); + assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); + + // When the ack is sent the member should go back to STABLE + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.STABLE, membershipManager.state()); } @Test public void testTransitionToReconcilingOnlyIfAssignmentReceived() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - assertEquals(MemberState.UNJOINED, membershipManager.state()); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); + assertEquals(MemberState.JOINING, membershipManager.state()); ConsumerGroupHeartbeatResponse responseWithoutAssignment = createConsumerGroupHeartbeatResponse(null); - membershipManager.updateState(responseWithoutAssignment.data()); + membershipManager.onHeartbeatResponseReceived(responseWithoutAssignment.data()); assertNotEquals(MemberState.RECONCILING, membershipManager.state()); ConsumerGroupHeartbeatResponse responseWithAssignment = - createConsumerGroupHeartbeatResponse(createAssignment()); - membershipManager.updateState(responseWithAssignment.data()); + createConsumerGroupHeartbeatResponse(createAssignment(true)); + membershipManager.onHeartbeatResponseReceived(responseWithAssignment.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } @Test public void testMemberIdAndEpochResetOnFencedMembers() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - ConsumerGroupHeartbeatResponse heartbeatResponse = - createConsumerGroupHeartbeatResponse(null); - membershipManager.updateState(heartbeatResponse.data()); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); + ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); + membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); assertEquals(MemberState.STABLE, membershipManager.state()); assertEquals(MEMBER_ID, membershipManager.memberId()); assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch()); + mockMemberHasAutoAssignedPartition(); + membershipManager.transitionToFenced(); - assertFalse(membershipManager.memberId().isEmpty()); + assertEquals(MEMBER_ID, membershipManager.memberId()); assertEquals(0, membershipManager.memberEpoch()); } @Test - public void testTransitionToFailure() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + public void testTransitionToFatal() { + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); - membershipManager.updateState(heartbeatResponse.data()); + membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); assertEquals(MemberState.STABLE, membershipManager.state()); assertEquals(MEMBER_ID, membershipManager.memberId()); assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch()); - membershipManager.transitionToFailed(); - assertEquals(MemberState.FAILED, membershipManager.state()); + when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + membershipManager.transitionToFatal(); + assertEquals(MemberState.FATAL, membershipManager.state()); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + } + + @Test + public void testTransitionToFailedWhenTryingToJoin() { + MembershipManagerImpl membershipManager = new MembershipManagerImpl( + GROUP_ID, subscriptionState, commitRequestManager, metadata, + testBuilder.logContext); + assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + membershipManager.transitionToJoining(); + + when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + membershipManager.transitionToFatal(); + assertEquals(MemberState.FATAL, membershipManager.state()); } @Test public void testFencingWhenStateIsStable() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); - membershipManager.updateState(heartbeatResponse.data()); + MembershipManager membershipManager = createMemberInStableState(); + testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + } + + @Test + public void testFencingWhenStateIsReconciling() { + MembershipManager membershipManager = mockJoinAndReceiveAssignment(false); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + + testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + } + + /** + * This is the case where a member is stuck reconciling and transition out of the RECONCILING + * state (due to failure). When the reconciliation completes it should not be applied because + * it is not relevant anymore (it should not update the assignment on the member or send ack). + */ + @Test + public void testDelayedReconciliationResultDiscardedIfMemberNotInReconcilingStateAnymore() { + MembershipManagerImpl membershipManager = createMemberInStableState(); + Uuid topicId1 = Uuid.randomUuid(); + String topic1 = "topic1"; + Set<TopicPartition> owned = Collections.singleton(new TopicPartition(topic1, 0)); + mockOwnedPartitionAndAssignmentReceived(topicId1, topic1, owned, true); + + // Reconciliation that does not complete stuck on revocation commit. + CompletableFuture<Void> commitResult = mockEmptyAssignmentAndRevocationStuckOnCommit(membershipManager); + + // Member received fatal error while reconciling + when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + membershipManager.transitionToFatal(); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + clearInvocations(subscriptionState); + + // Complete commit request + commitResult.complete(null); + + // Member should not update the subscription or send ack when the delayed reconciliation + // completed. + verify(subscriptionState, never()).assignFromSubscribed(anySet()); + assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); + } + + /** + * This is the case where a member is stuck reconciling an assignment A (waiting on + * metadata, commit or callbacks), and it rejoins (due to fence or unsubscribe/subscribe). If + * the reconciliation of A completes it should not be applied (it should not update the + * assignment on the member or send ack). + */ + @Test + public void testDelayedReconciliationResultDiscardedIfMemberRejoins() { + MembershipManagerImpl membershipManager = createMemberInStableState(); + Uuid topicId1 = Uuid.randomUuid(); + String topic1 = "topic1"; + Set<TopicPartition> owned = Collections.singleton(new TopicPartition(topic1, 0)); + mockOwnedPartitionAndAssignmentReceived(topicId1, topic1, owned, true); + + // Reconciliation that does not complete stuck on revocation commit. + CompletableFuture<Void> commitResult = + mockNewAssignmentAndRevocationStuckOnCommit(membershipManager, topicId1, topic1, + Arrays.asList(1, 2), true); + Set<TopicPartition> assignment1 = new HashSet<>(); + assignment1.add(new TopicPartition(topic1, 1)); + assignment1.add(new TopicPartition(topic1, 2)); + assertEquals(assignment1, membershipManager.assignmentReadyToReconcile()); + int currentEpoch = membershipManager.memberEpoch(); + + // Get fenced and rejoin while still reconciling. Get new assignment to reconcile after + // rejoining. + testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager); + clearInvocations(subscriptionState); + + // Get new assignment A2 after rejoining. This should not trigger a reconciliation just + // yet because there is another on in progress, but should keep the new assignment ready + // to be reconciled next. + Uuid topicId3 = Uuid.randomUuid(); + mockOwnedPartitionAndAssignmentReceived(topicId3, "topic3", owned, true); + receiveAssignmentAfterRejoin(topicId3, Collections.singletonList(5), membershipManager); + verifyReconciliationNotTriggered(membershipManager); + Set<TopicPartition> assignmentAfterRejoin = Collections.singleton(new TopicPartition("topic3", 5)); + assertEquals(assignmentAfterRejoin, membershipManager.assignmentReadyToReconcile()); + + // Reconciliation completes when the member has already re-joined the group. Should not + // update the subscription state or send ack. + commitResult.complete(null); + verify(subscriptionState, never()).assignFromSubscribed(anyCollection()); + assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); + + // Assignment received after rejoining should be ready to reconcile on the next + // reconciliation loop. + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertEquals(assignmentAfterRejoin, membershipManager.assignmentReadyToReconcile()); + } + + /** + * This is the case where a member is stuck reconciling an assignment A (waiting on + * metadata, commit or callbacks), and the target assignment changes (due to new topics added + * to metadata, or new assignment received from broker). If the reconciliation of A completes + * t should be applied (should update the assignment on the member and send ack), and then + * the reconciliation of assignment B will be processed and applied in the next + * reconciliation loop. + */ + @Test + public void testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataUpdate() { + // Member receives and reconciles topic1-partition0 + Uuid topicId1 = Uuid.randomUuid(); + String topic1 = "topic1"; + MembershipManagerImpl membershipManager = + mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1, topic1, Arrays.asList(0)); + membershipManager.onHeartbeatRequestSent(); assertEquals(MemberState.STABLE, membershipManager.state()); + clearInvocations(membershipManager, subscriptionState); + when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(new TopicPartition(topic1, 0))); + + // New assignment revoking the partitions owned and adding a new one (not in metadata). + // Reconciliation triggered for topic 1 (stuck on revocation commit) and topic2 waiting + // for metadata. + Uuid topicId2 = Uuid.randomUuid(); + String topic2 = "topic2"; + CompletableFuture<Void> commitResult = + mockNewAssignmentAndRevocationStuckOnCommit(membershipManager, topicId2, topic2, + Arrays.asList(1, 2), false); + verify(metadata).requestUpdate(anyBoolean()); + assertEquals(Collections.singleton(topicId2), membershipManager.topicsWaitingForMetadata()); - testStateUpdateOnFenceError(membershipManager); + // Metadata discovered for topic2 while reconciliation in progress to revoke topic1. + // Should not trigger a new reconciliation because there is one already in progress. + mockTopicNameInMetadataCache(Collections.singletonMap(topicId2, topic2), true); + membershipManager.onUpdate(null); + assertEquals(Collections.emptySet(), membershipManager.topicsWaitingForMetadata()); + verifyReconciliationNotTriggered(membershipManager); + + // Reconciliation in progress completes. Should be applied revoking topic 1 only. Newly + // discovered topic2 will be reconciled in the next reconciliation loop. + commitResult.complete(null); + + // Member should update the subscription and send ack when the delayed reconciliation + // completes. + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); + + // Pending assignment that was discovered in metadata should be ready to reconcile in the + // next reconciliation loop. + Set<TopicPartition> topic2Assignment = new HashSet<>(Arrays.asList( + new TopicPartition(topic2, 1), + new TopicPartition(topic2, 2))); + assertEquals(topic2Assignment, membershipManager.assignmentReadyToReconcile()); + assertEquals(topic2Assignment, membershipManager.assignmentReadyToReconcile()); } @Test - public void testFencingWhenStateIsReconciling() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(createAssignment()); - membershipManager.updateState(heartbeatResponse.data()); + public void testLeaveGroupWhenStateIsStable() { + MembershipManager membershipManager = createMemberInStableState(); + testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + } + + @Test + public void testLeaveGroupWhenStateIsReconciling() { + MembershipManager membershipManager = mockJoinAndReceiveAssignment(false); assertEquals(MemberState.RECONCILING, membershipManager.state()); - testStateUpdateOnFenceError(membershipManager); + testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager); + } + + @Test + public void testLeaveGroupWhenMemberAlreadyLeaving() { + MembershipManager membershipManager = createMemberInStableState(); + + // First leave attempt. Should trigger the callbacks and stay LEAVING until + // callbacks complete and the heartbeat is sent out. + mockLeaveGroup(); + CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup(); + assertFalse(leaveResult1.isDone()); + assertEquals(MemberState.LEAVING, membershipManager.state()); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + clearInvocations(subscriptionState); + + // Second leave attempt while the first one has not completed yet. Should not + // trigger any callbacks, and return a future that will complete when the ongoing first + // leave operation completes. + mockLeaveGroup(); + CompletableFuture<Void> leaveResult2 = membershipManager.leaveGroup(); + verify(subscriptionState, never()).rebalanceListener(); + assertFalse(leaveResult2.isDone()); + + // Complete first leave group operation. Should also complete the second leave group. + membershipManager.onHeartbeatRequestSent(); + assertTrue(leaveResult1.isDone()); + assertFalse(leaveResult1.isCompletedExceptionally()); + assertTrue(leaveResult2.isDone()); + assertFalse(leaveResult2.isCompletedExceptionally()); + + // Subscription should have been updated only once with the first leave group. + verify(subscriptionState, never()).assignFromSubscribed(Collections.emptySet()); + } + + @Test + public void testLeaveGroupWhenMemberAlreadyLeft() { + MembershipManager membershipManager = createMemberInStableState(); + + // Leave group triggered and completed + mockLeaveGroup(); + CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup(); + assertEquals(MemberState.LEAVING, membershipManager.state()); + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + assertTrue(leaveResult1.isDone()); + assertFalse(leaveResult1.isCompletedExceptionally()); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + clearInvocations(subscriptionState); + + // Call to leave group again, when member already left. Should be no-op (no callbacks, + // no assignment updated) + mockLeaveGroup(); + CompletableFuture<Void> leaveResult2 = membershipManager.leaveGroup(); + assertTrue(leaveResult2.isDone()); + assertFalse(leaveResult2.isCompletedExceptionally()); + assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + verify(subscriptionState, never()).rebalanceListener(); + verify(subscriptionState, never()).assignFromSubscribed(Collections.emptySet()); } @Test public void testFatalFailureWhenStateIsUnjoined() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - assertEquals(MemberState.UNJOINED, membershipManager.state()); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); + assertEquals(MemberState.JOINING, membershipManager.state()); testStateUpdateOnFatalFailure(membershipManager); } @Test public void testFatalFailureWhenStateIsStable() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); - membershipManager.updateState(heartbeatResponse.data()); + membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); assertEquals(MemberState.STABLE, membershipManager.state()); testStateUpdateOnFatalFailure(membershipManager); } - @Test - public void testFencingShouldNotHappenWhenStateIsUnjoined() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - assertEquals(MemberState.UNJOINED, membershipManager.state()); - - // Getting fenced when the member is not part of the group is not expected and should - // fail with invalid transition. - assertThrows(IllegalStateException.class, membershipManager::transitionToFenced); - } - @Test public void testUpdateStateFailsOnResponsesWithErrors() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); // Updating state with a heartbeat response containing errors cannot be performed and // should fail. ConsumerGroupHeartbeatResponse unknownMemberResponse = createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID); assertThrows(IllegalArgumentException.class, - () -> membershipManager.updateState(unknownMemberResponse.data())); + () -> membershipManager.onHeartbeatResponseReceived(unknownMemberResponse.data())); } + /** + * This test should be the case when an assignment is sent to the member, and it cannot find + * it in metadata (permanently, ex. topic deleted). The member will keep the assignment as + * waiting for metadata, but the following assignment received from the broker will not + * contain the deleted topic. The member will discard the assignment that was pending and + * proceed with the reconciliation of the new assignment. + */ @Test - public void testAssignmentUpdatedAsReceivedAndProcessed() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - ConsumerGroupHeartbeatResponseData.Assignment newAssignment = createAssignment(); - ConsumerGroupHeartbeatResponse heartbeatResponse = - createConsumerGroupHeartbeatResponse(newAssignment); - membershipManager.updateState(heartbeatResponse.data()); + public void testNewAssignmentReplacesPreviousOneWaitingOnMetadata() { + MembershipManagerImpl membershipManager = mockJoinAndReceiveAssignment(false); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + + // When the ack is sent the member should go back to RECONCILING + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertTrue(membershipManager.topicsWaitingForMetadata().size() > 0); + + // New target assignment received while there is another one waiting to be resolved + // and reconciled. This assignment does not include the previous one that is waiting + // for metadata, so the member will discard the topics that were waiting for metadata, and + // reconcile the new assignment. + Uuid topicId = Uuid.randomUuid(); + String topicName = "topic1"; + when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName)); + receiveAssignment(topicId, Collections.singletonList(0), membershipManager); + Set<TopicPartition> expectedAssignment = Collections.singleton(new TopicPartition(topicName, 0)); + assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); + verify(subscriptionState).assignFromSubscribed(expectedAssignment); + + // When ack for the reconciled assignment is sent, member should go back to STABLE + // because the first assignment that was not resolved should have been discarded + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.STABLE, membershipManager.state()); + assertTrue(membershipManager.topicsWaitingForMetadata().isEmpty()); + } + + /** + * This test should be the case when an assignment is sent to the member, and it cannot find + * it in metadata (temporarily). The broker will continue to send the assignment to the + * member. The member should keep it was waiting for metadata and continue to request updates. Review Comment: Agree, rephrased it. This test is just for the case where the broker does keep sending (and the test above for when it does not) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org